亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

將任務添加到父級的多處理池

將任務添加到父級的多處理池

紫衣仙女 2023-06-06 17:27:45
如何將新任務添加到multiprocessing在父進程中初始化的池中?以下不起作用:from multiprocessing import Pooldef child_task(x):    # the child task spawns new tasks    results = p.map(grandchild_task, [x])    return results[0]def grandchild_task(x):    return xif __name__ == '__main__':    p = Pool(2)    print(p.map(child_task, [0]))    # Result: NameError: name 'p' is not defined動機:我需要并行化一個由各種子任務組成的程序,這些子任務本身也有子任務(即孫任務)。僅并行化子任務或孫任務不會利用我所有的 CPU 內核。在我的用例中,我有各種子任務(可能有 1-50 個),每個子任務有很多孫子任務(可能有 100-1000 個)。替代方案:如果使用 Python 的多處理包無法做到這一點,我很樂意切換到另一個支持它的庫。
查看完整描述

2 回答

?
鴻蒙傳說

TA貢獻1865條經驗 獲得超7個贊

有一個最小的可重現示例這樣的東西,然后除此之外還刪除了太多代碼,最終得到的東西 (1) 可能過于簡單化了,有危險而不是答案可能會錯過標記,并且 (2)不可能如圖所示運行(您需要將創建 Pool 和提交任務的代碼包含在由語句控制的塊中if __name__ == '__main__':。


但是根據您所展示的內容,我不認為 Pool 是適合您的解決方案;您應該根據需要創建 Process 實例。從進程中獲取結果的一種方法是將它們存儲在可共享的托管字典中,其鍵例如是創建結果的進程的進程 ID。


為了擴展您的示例,向子任務傳遞了兩個參數,x并且y需要作為結果返回x**2 + 'y**2。子任務將產生兩個孫任務實例,每個實例計算其參數的平方。然后,子任務將使用加法組合這些進程的返回值:


from multiprocessing import Process, Manager

import os



def child_task(results_dict, x, y):

    # the child task spawns new tasks

    p1 = Process(target=grandchild_task, args=(results_dict, x))

    p1.start()

    pid1 = p1.pid

    p2 = Process(target=grandchild_task, args=(results_dict, y))

    p2.start()

    pid2 = p2.pid

    p1.join()

    p2.join()

    pid = os.getpid()

    results_dict[pid] = results_dict[pid1] + results_dict[pid2]




def grandchild_task(results_dict, n):

    pid = os.getpid()

    results_dict[pid] = n * n



def main():

    manager = Manager()

    results_dict = manager.dict()

    p = Process(target=child_task, args=(results_dict, 2, 3))

    p.start()

    pid = p.pid

    p.join()

    # results will be stored with key p.pid:

    print(results_dict[pid])


if __name__ == '__main__':

    main()

印刷:


13

更新


例如,如果您確實遇到這樣一種情況,child_task需要處理 N 個相同的調用,只是參數不同,但它必須產生一兩個子進程,那么像以前一樣使用 Pool,但另外傳遞一個要使用的托管child_task字典用于產生額外的進程(不嘗試為此使用池)并檢索它們的結果。


更新 2


我能弄清楚子進程本身使用池的唯一方法是使用模塊ProcessPoolExecutor中的類concurrent.futures。當我試圖用 做同樣的事情時multiprocessing.Pool,我得到了一個錯誤,因為我們有守護進程試圖創建自己的進程。但即使在這里,唯一的方法是池中的每個進程都有自己的進程池。您的計算機上只有有限數量的處理器/內核,因此除非在處理中混合了一些 I/O,否則您可以創建所有這些池,但進程將等待運行的機會。因此,尚不清楚將實現什么樣的性能提升。還有關閉為child_task子進程創建的所有池的問題。通常一個ProcessPoolExecutor實例是使用with塊,當該塊終止時,將清理創建的池。但是child_task被重復調用并且顯然不能使用with塊,因為我們不希望不斷地創建和銷毀池。我來到這里有點麻煩:傳遞了第三個參數,True 或 False,指示是否child_task應該啟動其池的關閉。此參數的默認值為 False,我們甚至懶得傳遞它。在檢索到所有實際結果并且child_task進程現在空閑之后,我們提交 N 個具有虛擬值但shutdown設置為 True 的新任務。請注意,該ProcessPoolExecutor函數的map工作方式與類中的相同函數有很大不同Pool(閱讀文檔):


from concurrent.futures import ProcessPoolExecutor

import time



child_executor = None



def child_task(x, y, shutdown=False):

    global child_executor


    if child_executor is None:

        child_executor = ProcessPoolExecutor(max_workers=1)

    if shutdown:

        if child_executor:

            child_executor.shutdown(False)

            child_executor = None

            time.sleep(.2) # make sure another process in the pool gets the next task

        return None

    # the child task spawns new task(s)

    future = child_executor.submit(grandchild_task, y)

    # we can compute one of the results using the current process:

    return grandchild_task(x) + future.result()



def grandchild_task(n):

    return n * n



def main():

    N_WORKERS = 2

    with ProcessPoolExecutor(max_workers=N_WORKERS) as executor:

        # first call is (1, 2), second call is (3, 4):

        results = [result for result in executor.map(child_task, (1, 3), (2, 4))]

        print(results)

        # force a shutdown

        # need N_WORKERS invocations:

        [result for result in executor.map(child_task, (0,) * N_WORKERS, (0,) * N_WORKERS, (True,) * N_WORKERS)]



if __name__ == '__main__':

    main()

印刷:


[5, 25]


查看完整回答
反對 回復 2023-06-06
?
holdtom

TA貢獻1805條經驗 獲得超10個贊

檢查此解決方案:


#!/usr/bin/python

# requires Python version 3.8 or higher


from multiprocessing import Queue, Process

import time

from random import randrange

import os

import psutil



# function to be run by each child process

def square(number):

    sleep = randrange(5)

    time.sleep(sleep)

    print(f'Result is {number * number}, computed by pid {os.getpid()}...sleeping {sleep} secs')



# create a queue where all tasks will be placed

queue = Queue()


# indicate how many number of children you want the system to create to run the tasks

number_of_child_proceses = 5


# put all tasks in the queue above

for task in range(19):

    queue.put(task)



# this the main entry/start of the program when you run

def main():

    number_of_task = queue.qsize()

    print(f'{"_" * 60}\nBatch: {number_of_task // number_of_child_proceses + 1} \n{"_" * 60}')


    # don't create more number of children than the number of tasks. Also, in the last round, wait for all child process

    # to complete so as to wrap up everything

    if number_of_task <= number_of_child_proceses:

        processes = [Process(target=square, args=(queue.get(),)) for _ in

                     range(number_of_task)]

        for p in processes:

            p.start()

            p.join()


    else:

        processes = [Process(target=square, args=(queue.get(),)) for _ in range(number_of_child_proceses)]

        for p in processes:

            p.start()


    # update count of remaining task

    number_of_task = queue.qsize()


    # run the program in a loop until no more task remains in the queue

    while number_of_task:

        current_process = psutil.Process()

        children = current_process.children()


        # if children process have completed assigned task but there is still more remaining tasks in the queue,

        # assign them more tasks

        if not len(children) and number_of_task:

            print(f'\nAssigned tasks completed... reasigning the remaining {number_of_task} task(s) in the queue\n')

            main()


    # exit the loop if no more task in the queue to work on


    print('\nAll tasks completed!!')

    exit()



if __name__ == "__main__":

    main()



查看完整回答
反對 回復 2023-06-06
?
梵蒂岡之花

TA貢獻1900條經驗 獲得超5個贊

我環顧四周,找到了Ray,它使用嵌套的遠程函數解決了這個確切的用例。



查看完整回答
反對 回復 2023-06-06
  • 2 回答
  • 0 關注
  • 188 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號