1 回答

TA貢獻1842條經驗 獲得超22個贊
您已經非常清楚可以做什么來使用您的數據。
最佳解決方案取決于您的實際需求,因此我將嘗試通過一個可行的示例來介紹這些可能性。
你想要什么
如果我完全理解你的需要,你想
不斷更新 DataFrame(來自 websocket)
在對同一個 DataFrame 進行一些計算時
使 DataFrame 在計算工作者上保持最新,
一項計算是 CPU 密集型的
另一個不是。
你需要什么
正如您所說,您將需要一種方法來運行不同的線程或進程以保持計算運行。
線程怎么樣
實現您想要的最簡單的方法是使用線程庫。由于線程可以共享內存,并且您只有一個工作人員實際更新 DataFrame,因此很容易提出一種方法來管理最新數據:
import time
from dataclasses import dataclass
import pandas
from threading import Thread
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
class StreamLoader:
"""This class is our worker communicating with the websocket"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(5):
self.update_df()
print("[1] Updated DF %s" % str(self.df_holder.dataframe))
time.sleep(3)
class LightComputation:
"""This class is a random computation worker"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def compute(self):
print("[2] Current DF %s" % str(self.df_holder.dataframe))
def run(self):
# limit loop for the showcase
for _ in range(5):
self.compute()
time.sleep(5)
def main():
# We create a DataFrameHolder to keep our DataFrame available.
df_holder = DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Thread(target=compute.run)
compute_process.start()
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
請注意,我們使用一個類來保存當前 DataFrame 的引用,因為某些操作append不一定是就地的,因此,如果我們直接將引用發送到 DataFrame 對象,則修改將丟失在 worker 上。這里DataFrameHolder對象將在內存中保持相同的位置,因此工作人員仍然可以訪問更新的 DataFrame。
流程可能更強大
現在,如果您需要更多的計算能力,進程可能會更有用,因為它們使您能夠將您的工作人員隔離在不同的核心上。要在 python 中啟動進程而不是線程,可以使用多處理庫。兩個對象的 API 是相同的,你只需要改變構造函數如下
from threading import Thread
# I create a thread
compute_process = Thread(target=compute.run)
from multiprocessing import Process
# I create a process that I can use the same way
compute_process = Process(target=compute.run)
現在,如果您嘗試更改上述腳本中的值,您將看到 DataFrame 沒有正確更新。
為此,您將需要更多的工作,因為這兩個進程不共享內存,并且您有多種方式在它們之間進行通信(https://en.wikipedia.org/wiki/Inter-process_communication)
@SimonCrane 的參考在這方面非常有趣,并展示了使用multiprocessing.manager在兩個進程之間使用共享內存。
這是一個工人使用共享內存的單獨進程的版本:
import logging
import multiprocessing
import time
from dataclasses import dataclass
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from threading import Thread
import pandas
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
def retrieve(self):
return self.dataframe
class DataFrameManager(BaseManager):
"""This dataclass handles shared DataFrameHolder.
See https://docs.python.org/3/library/multiprocessing.html#examples
"""
# You can also use a socket file '/tmp/shared_df'
MANAGER_ADDRESS = ('localhost', 6000)
MANAGER_AUTH = b"auth"
def __init__(self) -> None:
super().__init__(self.MANAGER_ADDRESS, self.MANAGER_AUTH)
self.dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
@classmethod
def register_dataframe(cls):
BaseManager.register("DataFrameHolder", DataFrameHolder)
class DFWorker:
"""Abstract class initializing a worker depending on a DataFrameHolder."""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
class StreamLoader(DFWorker):
"""This class is our worker communicating with the websocket"""
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(4):
self.update_df()
print("[1] Updated DF\n%s" % str(self.df_holder.retrieve()))
time.sleep(3)
class LightComputation(DFWorker):
"""This class is a random computation worker"""
def compute(self):
print("[2] Current DF\n%s" % str(self.df_holder.retrieve()))
def run(self):
# limit loop for the showcase
for _ in range(4):
self.compute()
time.sleep(5)
def main():
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
# Register our DataFrameHolder type in the DataFrameManager.
DataFrameManager.register_dataframe()
manager = DataFrameManager()
manager.start()
# We create a managed DataFrameHolder to keep our DataFrame available.
df_holder = manager.DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Process(target=compute.run)
compute_process.start()
# The managed dataframe is updated in every Thread/Process
time.sleep(5)
print("[0] Main process DF\n%s" % df_holder.retrieve())
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
如您所見,線程和處理之間的差異非常小。
通過一些調整,如果您想使用不同的文件來處理 CPU 密集型處理,您可以從那里開始連接到同一個管理器。
添加回答
舉報