亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在獨立運行的 python 腳本之間共享 python 對象

在獨立運行的 python 腳本之間共享 python 對象

忽然笑 2022-07-05 19:04:56
這是我在這里的第一個問題,我希望我不會提出與已經存在的問題非常相似的問題。如果是這樣,請原諒我!因此,我遇到了一些麻煩的情況如下:我想并行運行獨立的 python 腳本,它可以訪問相同的 python 對象,在我的例子中是 Pandas Dataframe。我的想法是,一個腳本基本上一直在運行并訂閱一個數據流(這里是通過 websocket 推送的數據),然后將其附加到一個共享的 Dataframe 中。第二個腳本應該能夠獨立于第一個腳本啟動,并且仍然可以訪問由第一個腳本不斷更新的 Dataframe。在第二個腳本中,我想在預定義的時間間隔內執行不同類型的分析,或者對實時數據執行其他相對時間密集的操作。我已經嘗試從一個腳本中運行所有操作,但我一直與 websocket 斷開連接。將來還會有多個腳本可以實時訪問共享數據。與其在腳本 1 中的每次更新后保存一個 csv 文件或 pickle,我寧愿有一個解決方案,兩個腳本基本上共享相同的內存。我也只需要一個腳本來寫入并附加到 Dataframe,另一個只需要從中讀取。多處理模塊似乎有一些有趣的功能,這可能會有所幫助,但到目前為止我還沒有真正理解它。我還查看了全局變量,但這似乎也不是在這種情況下使用的正確方法。我想象的是這樣的(我知道,代碼完全錯誤,這只是為了說明目的):第一個腳本應繼續將數據流中的新數據分配給數據幀并共享此對象。from share_data import shareshared_df = pd.DataFrame()for data from datastream:        shared_df.append(data)        share(shared_df)然后第二個腳本應該能夠執行以下操作:from share_data import getdf = get(shared_df)這完全有可能嗎?或者您對如何以簡單的方式完成此任務有任何想法嗎?或者您有什么建議可以使用哪些軟件包?
查看完整描述

1 回答

?
茅侃侃

TA貢獻1842條經驗 獲得超22個贊

您已經非常清楚可以做什么來使用您的數據。

最佳解決方案取決于您的實際需求,因此我將嘗試通過一個可行的示例來介紹這些可能性。

你想要什么

如果我完全理解你的需要,你想

  • 不斷更新 DataFrame(來自 websocket)

  • 在對同一個 DataFrame 進行一些計算時

  • 使 DataFrame 在計算工作者上保持最新,

  • 一項計算是 CPU 密集型的

  • 另一個不是。

你需要什么

正如您所說,您將需要一種方法來運行不同的線程或進程以保持計算運行。

線程怎么樣

實現您想要的最簡單的方法是使用線程庫。由于線程可以共享內存,并且您只有一個工作人員實際更新 DataFrame,因此很容易提出一種方法來管理最新數據:

import time

from dataclasses import dataclass


import pandas

from threading import Thread



@dataclass

class DataFrameHolder:

    """This dataclass holds a reference to the current DF in memory.

    This is necessary if you do operations without in-place modification of

    the DataFrame, since you will need replace the whole object.

    """

    dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])


    def update(self, data):

        self.dataframe = self.dataframe.append(data, ignore_index=True)



class StreamLoader:

    """This class is our worker communicating with the websocket"""


    def __init__(self, df_holder: DataFrameHolder) -> None:

        super().__init__()

        self.df_holder = df_holder


    def update_df(self):

        # read from websocket and update your DF.

        data = {

            'A': [1, 2, 3],

            'B': [4, 5, 6],

        }

        self.df_holder.update(data)


    def run(self):

        # limit loop for the showcase

        for _ in range(5):

            self.update_df()

            print("[1] Updated DF %s" % str(self.df_holder.dataframe))

            time.sleep(3)



class LightComputation:

    """This class is a random computation worker"""


    def __init__(self, df_holder: DataFrameHolder) -> None:

        super().__init__()

        self.df_holder = df_holder


    def compute(self):

        print("[2] Current DF %s" % str(self.df_holder.dataframe))


    def run(self):

        # limit loop for the showcase

        for _ in range(5):

            self.compute()

            time.sleep(5)



def main():

    # We create a DataFrameHolder to keep our DataFrame available.

    df_holder = DataFrameHolder()


    # We create and start our update worker

    stream = StreamLoader(df_holder)

    stream_process = Thread(target=stream.run)

    stream_process.start()


    # We create and start our computation worker

    compute = LightComputation(df_holder)

    compute_process = Thread(target=compute.run)

    compute_process.start()


    # We join our Threads, i.e. we wait for them to finish before continuing

    stream_process.join()

    compute_process.join()



if __name__ == "__main__":

    main()

請注意,我們使用一個類來保存當前 DataFrame 的引用,因為某些操作append不一定是就地的,因此,如果我們直接將引用發送到 DataFrame 對象,則修改將丟失在 worker 上。這里DataFrameHolder對象將在內存中保持相同的位置,因此工作人員仍然可以訪問更新的 DataFrame。


流程可能更強大

現在,如果您需要更多的計算能力,進程可能會更有用,因為它們使您能夠將您的工作人員隔離在不同的核心上。要在 python 中啟動進程而不是線程,可以使用多處理庫。兩個對象的 API 是相同的,你只需要改變構造函數如下


from threading import Thread

# I create a thread

compute_process = Thread(target=compute.run)



from multiprocessing import Process

# I create a process that I can use the same way

compute_process = Process(target=compute.run)

現在,如果您嘗試更改上述腳本中的值,您將看到 DataFrame 沒有正確更新。


為此,您將需要更多的工作,因為這兩個進程不共享內存,并且您有多種方式在它們之間進行通信(https://en.wikipedia.org/wiki/Inter-process_communication)


@SimonCrane 的參考在這方面非常有趣,并展示了使用multiprocessing.manager在兩個進程之間使用共享內存。


這是一個工人使用共享內存的單獨進程的版本:


import logging

import multiprocessing

import time

from dataclasses import dataclass

from multiprocessing import Process

from multiprocessing.managers import BaseManager

from threading import Thread


import pandas



@dataclass

class DataFrameHolder:

    """This dataclass holds a reference to the current DF in memory.

    This is necessary if you do operations without in-place modification of

    the DataFrame, since you will need replace the whole object.

    """

    dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])


    def update(self, data):

        self.dataframe = self.dataframe.append(data, ignore_index=True)


    def retrieve(self):

        return self.dataframe



class DataFrameManager(BaseManager):

    """This dataclass handles shared DataFrameHolder.

    See https://docs.python.org/3/library/multiprocessing.html#examples

    """

    # You can also use a socket file '/tmp/shared_df'

    MANAGER_ADDRESS = ('localhost', 6000)

    MANAGER_AUTH = b"auth"


    def __init__(self) -> None:

        super().__init__(self.MANAGER_ADDRESS, self.MANAGER_AUTH)

        self.dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])


    @classmethod

    def register_dataframe(cls):

        BaseManager.register("DataFrameHolder", DataFrameHolder)



class DFWorker:

    """Abstract class initializing a worker depending on a DataFrameHolder."""


    def __init__(self, df_holder: DataFrameHolder) -> None:

        super().__init__()

        self.df_holder = df_holder



class StreamLoader(DFWorker):

    """This class is our worker communicating with the websocket"""


    def update_df(self):

        # read from websocket and update your DF.

        data = {

            'A': [1, 2, 3],

            'B': [4, 5, 6],

        }

        self.df_holder.update(data)


    def run(self):

        # limit loop for the showcase

        for _ in range(4):

            self.update_df()

            print("[1] Updated DF\n%s" % str(self.df_holder.retrieve()))

            time.sleep(3)



class LightComputation(DFWorker):

    """This class is a random computation worker"""


    def compute(self):

        print("[2] Current DF\n%s" % str(self.df_holder.retrieve()))


    def run(self):

        # limit loop for the showcase

        for _ in range(4):

            self.compute()

            time.sleep(5)



def main():

    logger = multiprocessing.log_to_stderr()

    logger.setLevel(logging.INFO)


    # Register our DataFrameHolder type in the DataFrameManager.

    DataFrameManager.register_dataframe()

    manager = DataFrameManager()

    manager.start()

    # We create a managed DataFrameHolder to keep our DataFrame available.

    df_holder = manager.DataFrameHolder()


    # We create and start our update worker

    stream = StreamLoader(df_holder)

    stream_process = Thread(target=stream.run)

    stream_process.start()


    # We create and start our computation worker

    compute = LightComputation(df_holder)

    compute_process = Process(target=compute.run)

    compute_process.start()


    # The managed dataframe is updated in every Thread/Process

    time.sleep(5)

    print("[0] Main process DF\n%s" % df_holder.retrieve())


    # We join our Threads, i.e. we wait for them to finish before continuing

    stream_process.join()

    compute_process.join()



if __name__ == "__main__":

    main()

如您所見,線程和處理之間的差異非常小。


通過一些調整,如果您想使用不同的文件來處理 CPU 密集型處理,您可以從那里開始連接到同一個管理器。


查看完整回答
反對 回復 2022-07-05
  • 1 回答
  • 0 關注
  • 173 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號