亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

為什么在 python 多處理中關閉池之前不能使用 join()

為什么在 python 多處理中關閉池之前不能使用 join()

天涯盡頭無女友 2023-09-12 17:13:35
我有一個類,它有一個方法可以進行一些并行計算,并且經常被調用。因此,我希望我的池在類的構造函數中初始化一次,而不是每次調用此方法時都創建一個新池。在此方法中,我想使用 apply_async() 為所有工作進程啟動一個任務,然后等待(阻塞)并聚合每個任務的結果。我的代碼如下所示:class Foo:     def __init__(self, ...):         # ...         self.pool = mp.Pool(mp.cpu_count())     def do_parallel_calculations(self, ...):         for _ in range(mp.cpu_count()):              self.pool.apply_async(calc_func, args=(...), callback=aggregate_result)                  # wait for results to be aggregated to a global var by the callback         self.pool.join()  # <-- ValueError: Pool is still running                  # do something with the aggregated result of all worker processes但是,當我運行此命令時,我在 self.pool.join() 中收到錯誤消息:“ValueError:池仍在運行”?,F在,在所有示例中,我都看到 self.pool.close() 在 self.pool.join() 之前被調用,我認為這就是我收到此錯誤的原因,但我不想關閉我的池,因為我想要它在那里下次調用此方法時!我不能不使用 self.pool.join(),因為我需要一種方法來等待所有進程完成,并且我不想浪費地手動旋轉,例如使用“while not global_flag: pass”。我可以做什么來實現我想要做的事情?為什么多重處理不允許我加入仍然開放的池?這似乎是一件完全合理的事情。
查看完整描述

2 回答

?
一只斗牛犬

TA貢獻1784條經驗 獲得超2個贊

讓我們用一個真實的例子來具體說明這一點:


import multiprocessing as mp



def calc_func(x):

    return x * x



class Foo:

    def __init__(self):

        self.pool = mp.Pool(mp.cpu_count())


    def do_parallel_calculations(self, values):

        results = []

        for value in values:

            results.append(self.pool.apply_async(calc_func, args=(value,)))

        for result in results:

            print(result.get())


if __name__ == '__main__':

    foo = Foo()

    foo.do_parallel_calculations([1,2,3])


查看完整回答
反對 回復 2023-09-12
?
千巷貓影

TA貢獻1829條經驗 獲得超7個贊

我想我設法通過在 apply_async() 返回的 AsyncResult 對象上調用 get() 來做到這一點。所以代碼就變成了:


def do_parallel_calculations(self, ...):

     results = []

     for _ in range(mp.cpu_count()):

          results.append(self.pool.apply_async(calc_func, args=(...)))

     aggregated_result = 0

     for result in results:

          aggregated_result += result.get()

其中 calc_func() 返回單個任務結果,不需要回調和全局變量。


這并不理想,因為我以任意順序等待它們,而不是按照它們實際完成的順序(最有效的方法是減少結果),但由于我只有 4 個核心,所以幾乎不會被注意到。


查看完整回答
反對 回復 2023-09-12
  • 2 回答
  • 0 關注
  • 130 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號