2 回答

TA貢獻1865條經驗 獲得超7個贊
有一個最小的可重現示例這樣的東西,然后除此之外還刪除了太多代碼,最終得到的東西 (1) 可能過于簡單化了,有危險而不是答案可能會錯過標記,并且 (2)不可能如圖所示運行(您需要將創建 Pool 和提交任務的代碼包含在由語句控制的塊中if __name__ == '__main__':。
但是根據您所展示的內容,我不認為 Pool 是適合您的解決方案;您應該根據需要創建 Process 實例。從進程中獲取結果的一種方法是將它們存儲在可共享的托管字典中,其鍵例如是創建結果的進程的進程 ID。
為了擴展您的示例,向子任務傳遞了兩個參數,x并且y需要作為結果返回x**2 + 'y**2。子任務將產生兩個孫任務實例,每個實例計算其參數的平方。然后,子任務將使用加法組合這些進程的返回值:
from multiprocessing import Process, Manager
import os
def child_task(results_dict, x, y):
# the child task spawns new tasks
p1 = Process(target=grandchild_task, args=(results_dict, x))
p1.start()
pid1 = p1.pid
p2 = Process(target=grandchild_task, args=(results_dict, y))
p2.start()
pid2 = p2.pid
p1.join()
p2.join()
pid = os.getpid()
results_dict[pid] = results_dict[pid1] + results_dict[pid2]
def grandchild_task(results_dict, n):
pid = os.getpid()
results_dict[pid] = n * n
def main():
manager = Manager()
results_dict = manager.dict()
p = Process(target=child_task, args=(results_dict, 2, 3))
p.start()
pid = p.pid
p.join()
# results will be stored with key p.pid:
print(results_dict[pid])
if __name__ == '__main__':
main()
印刷:
13
更新
例如,如果您確實遇到這樣一種情況,child_task需要處理 N 個相同的調用,只是參數不同,但它必須產生一兩個子進程,那么像以前一樣使用 Pool,但另外傳遞一個要使用的托管child_task字典用于產生額外的進程(不嘗試為此使用池)并檢索它們的結果。
更新 2
我能弄清楚子進程本身使用池的唯一方法是使用模塊ProcessPoolExecutor中的類concurrent.futures。當我試圖用 做同樣的事情時multiprocessing.Pool,我得到了一個錯誤,因為我們有守護進程試圖創建自己的進程。但即使在這里,唯一的方法是池中的每個進程都有自己的進程池。您的計算機上只有有限數量的處理器/內核,因此除非在處理中混合了一些 I/O,否則您可以創建所有這些池,但進程將等待運行的機會。因此,尚不清楚將實現什么樣的性能提升。還有關閉為child_task子進程創建的所有池的問題。通常一個ProcessPoolExecutor實例是使用with塊,當該塊終止時,將清理創建的池。但是child_task被重復調用并且顯然不能使用with塊,因為我們不希望不斷地創建和銷毀池。我來到這里有點麻煩:傳遞了第三個參數,True 或 False,指示是否child_task應該啟動其池的關閉。此參數的默認值為 False,我們甚至懶得傳遞它。在檢索到所有實際結果并且child_task進程現在空閑之后,我們提交 N 個具有虛擬值但shutdown設置為 True 的新任務。請注意,該ProcessPoolExecutor函數的map工作方式與類中的相同函數有很大不同Pool(閱讀文檔):
from concurrent.futures import ProcessPoolExecutor
import time
child_executor = None
def child_task(x, y, shutdown=False):
global child_executor
if child_executor is None:
child_executor = ProcessPoolExecutor(max_workers=1)
if shutdown:
if child_executor:
child_executor.shutdown(False)
child_executor = None
time.sleep(.2) # make sure another process in the pool gets the next task
return None
# the child task spawns new task(s)
future = child_executor.submit(grandchild_task, y)
# we can compute one of the results using the current process:
return grandchild_task(x) + future.result()
def grandchild_task(n):
return n * n
def main():
N_WORKERS = 2
with ProcessPoolExecutor(max_workers=N_WORKERS) as executor:
# first call is (1, 2), second call is (3, 4):
results = [result for result in executor.map(child_task, (1, 3), (2, 4))]
print(results)
# force a shutdown
# need N_WORKERS invocations:
[result for result in executor.map(child_task, (0,) * N_WORKERS, (0,) * N_WORKERS, (True,) * N_WORKERS)]
if __name__ == '__main__':
main()
印刷:
[5, 25]

TA貢獻1805條經驗 獲得超10個贊
檢查此解決方案:
#!/usr/bin/python
# requires Python version 3.8 or higher
from multiprocessing import Queue, Process
import time
from random import randrange
import os
import psutil
# function to be run by each child process
def square(number):
sleep = randrange(5)
time.sleep(sleep)
print(f'Result is {number * number}, computed by pid {os.getpid()}...sleeping {sleep} secs')
# create a queue where all tasks will be placed
queue = Queue()
# indicate how many number of children you want the system to create to run the tasks
number_of_child_proceses = 5
# put all tasks in the queue above
for task in range(19):
queue.put(task)
# this the main entry/start of the program when you run
def main():
number_of_task = queue.qsize()
print(f'{"_" * 60}\nBatch: {number_of_task // number_of_child_proceses + 1} \n{"_" * 60}')
# don't create more number of children than the number of tasks. Also, in the last round, wait for all child process
# to complete so as to wrap up everything
if number_of_task <= number_of_child_proceses:
processes = [Process(target=square, args=(queue.get(),)) for _ in
range(number_of_task)]
for p in processes:
p.start()
p.join()
else:
processes = [Process(target=square, args=(queue.get(),)) for _ in range(number_of_child_proceses)]
for p in processes:
p.start()
# update count of remaining task
number_of_task = queue.qsize()
# run the program in a loop until no more task remains in the queue
while number_of_task:
current_process = psutil.Process()
children = current_process.children()
# if children process have completed assigned task but there is still more remaining tasks in the queue,
# assign them more tasks
if not len(children) and number_of_task:
print(f'\nAssigned tasks completed... reasigning the remaining {number_of_task} task(s) in the queue\n')
main()
# exit the loop if no more task in the queue to work on
print('\nAll tasks completed!!')
exit()
if __name__ == "__main__":
main()
添加回答
舉報