亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

dask Client.map() 調用期間會發生什么?

dask Client.map() 調用期間會發生什么?

幕布斯7119047 2024-01-15 15:39:58
我正在嘗試使用 dask 編寫一個網格搜索實用程序。目標函數調用包含大量數據的類的方法。我正在嘗試使用 dask 將計算并行化為多核解決方案,而無需復制原始類/數據幀。我在文檔中沒有找到任何解決方案,因此我在這里發布一個玩具示例:import picklefrom dask.distributed import Client, LocalClusterfrom multiprocessing import current_processclass TestClass:    def __init__(self):        self.param = 0    def __getstate__(self):        print("I am pickled!")        return self.__dict__    def loss(self, ext_param):        self.param += 1        print(f"{current_process().pid}: {hex(id(self))}:  {self.param}: {ext_param} ")        return f"{self.param}_{ext_param}"def objective_function(param):    return test_instance.loss(param)if __name__ == '__main__':    test_instance = TestClass()    print(hex(id(test_instance)))    cluster = LocalCluster(n_workers=2)    client = Client(cluster)    futures = client.map(objective_function, range(20))    result = client.gather(futures)    print(result)    # ---- OUTPUT RESULTS ----# 0x7fe0a5056d30# I am pickled!# I am pickled!# 11347: 0x7fb9bcfa0588:  1: 0# 11348: 0x7fb9bd0a2588:  1: 1# 11347: 0x7fb9bcf94240:  1: 2# 11348: 0x7fb9bd07b6a0:  1: 3# 11347: 0x7fb9bcf945f8:  1: 4 # ['1_0', '1_1', '1_2', '1_3', '1_4']我有以下問題:為什么下面的 pickle 函數被調用兩次?我注意到 map 函數的每次迭代都使用 的新副本test_instance,正如您可以從每次迭代的不同類地址以及屬性test_instance.param在每次迭代時設置為 0 的事實中看到的那樣(此行為與我在這里強調的 multiprocessing.Pool 的標準實現不同)。我假設在每次迭代期間每個進程都會收到腌制類的新副本 - 這是正確的嗎?根據(2),test_instance計算期間內存中有多少個 的副本?是 1 (對于主線程中的原始實例)+ 1 (腌制副本)+ 2 (每個進程中存在的實例)= 4 嗎?有什么辦法可以讓這個值變成1嗎?我注意到,可以通過使用 Ray 庫來獲得一些共享內存解決方案,如本 github 問題中所建議的。
查看完整描述

1 回答

?
猛跑小豬

TA貢獻1858條經驗 獲得超8個贊

為什么下面的 pickle 函數被調用兩次?

通常,python 的 pickle 有效地將實例變量和對導入模塊中的類的引用捆綁在一起。在 中__main__,這可能不可靠,dask 回退到 cloudpickle(內部也調用 pickle)。在我看來,在第一次嘗試腌制之前可能會進行"__main__"檢查。distributed.protocol.pickle.dumps

在每次迭代期間,每個進程都會收到 pickled 類的新副本

是的。每次 dask 運行任務時,它都會反序列化輸入,創建實例的 nw 副本。請注意,您的 dask 工作線程可能是通過 fork_server 技術創建的,因此內存不是簡單地復制(這是執行操作的安全方法)。

您可以在計算之前將實例“分散”給工作人員,他們可以重用其本地副本,但 dask 任務不應該通過改變對象來工作,而是通過返回結果(即功能上)來工作。

內存中有多少個 test_instance 副本

客戶端中 1 個,加上每個正在執行的任務 1 個。序列化版本也可能存在,可能是保存在圖中的版本,暫時保存在客戶端,然后保存在調度程序上;在反序列化時它也會暫時存在于工作內存中。對于某些類型,零拷貝解/序列化是可能的。

如果由于對象的大小而導致任務非常大,那么您絕對應該事先“分散”它們(client.scatter)。

有什么辦法可以讓這個值變成1嗎?

您可以在進程中運行調度程序和/或工作線程來共享內存,但是,當然,您會失去與 GIL 的并行性。

查看完整回答
反對 回復 2024-01-15
  • 1 回答
  • 0 關注
  • 154 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號