亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

multiprocessing.map 的替代方案,不會存儲函數的返回值

multiprocessing.map 的替代方案,不會存儲函數的返回值

暮色呼如 2023-08-08 09:48:46
我習慣于multiprocessing.imap_unordered同時運行一個函數,但我的 RAM 使用量不斷增加。問題如下:我有數百萬個itertools.product數據組合(使用 創建)需要傳遞給函數。然后,該函數將使用 SVM 計算分數,然后存儲分數和當前組合。該函數不會返回任何值,它只會計算分數并將其存儲在共享值中。我不需要所有其他組合,只需要最好的組合。通過使用imap_unorderedRAM 使用量不斷增加,直到由于 RAM 不足而崩潰。我想發生這種情況是因為它將imap存儲函數的結果,它不會返回任何值,但可能會保留 aNone或Null值?這是一個示例代碼:from functools import partialimport itertoolsimport multiprocessingimport timedef svm(input_data, params):    # Copy the data to avoid changing the original data    # as input_data is a reference to a pandas dataframe    # and I need to shift columns up and down.    dataset = input_data.copy()    # Use svm here to analyse data    score = sum(dataset) + sum(params)  # simulate score of svm    # Simulate a process that takes a bit of time    time.sleep(0.5)    return (score, params)if __name__ == "__main__":        # Without this, multiprocessing gives error    multiprocessing.freeze_support()    # Set the number of worker processes    # Empty for all the cores    # Int for number of processes    pool = multiprocessing.Pool()    # iterable settings    total_combinations = 2    total_features = 45    # Keep track of best score    best_score = -1000    best_param = [0 for _ in range(total_features)]    input_data = [x * x for x in range(10000)]    # Create a partial function with the necessary args    func = partial(svm, input_data)    params = itertools.product(range(total_combinations), repeat=total_features)在此示例中,您會注意到 RAM 使用量隨著時間的推移而增加。盡管在本例中它并不多,但如果您單獨放置一天或其他時間(通過增加可迭代的范圍),它將達到 GB 的 RAM。正如我所說,我有數百萬種組合。我應該如何解決這個問題?是否有替代方案,imap根本不會存儲有關該功能的任何內容?我應該只創建Processes而不是使用嗎Pool?難道是因為我正在復制數據集,后來垃圾收集器沒有清理它?
查看完整描述

2 回答

?
牧羊人nacy

TA貢獻1862條經驗 獲得超7個贊

您可以使用apply或apply_async


查看完整回答
反對 回復 2023-08-08
?
慕容3067478

TA貢獻1773條經驗 獲得超3個贊

import objgraph我已經使用和打印跟蹤了內存使用情況objgraph.show_most_common_types(limit=20)。我注意到元組和列表的數量在子進程的持續時間內不斷增加。為了解決這個問題,我更改了maxtasksperchild在Pool一段時間后強制關閉進程并因此釋放內存。


from functools import partial

import itertools

import multiprocessing

import random

import time


# Tracing memory leaks

import objgraph



def svm(input_data, params):


    # Copy the data to avoid changing the original data

    # as input_data is a reference to a pandas dataframe.

    dataset = input_data.copy()


    # Use svm here to analyse data

    score = sum(dataset) + sum(params)  # simulate score of svm


    # Simulate a process that takes a bit of time

    time.sleep(0.5)


    return (score, params)



if __name__ == "__main__":


    # iterable settings

    total_combinations = 2

    total_features = 12


    # Keep track of best score

    best_score = -1000

    best_param = [0 for _ in range(total_features)]


    # Simulate a dataframe with random data

    input_data = [random.random() for _ in range(100000)]


    # Create a partial function with the necessary args

    func = partial(svm, input_data)

    params = itertools.product(range(total_combinations), repeat=total_features)


    # Without this, multiprocessing gives error

    multiprocessing.freeze_support()


    # Set the number of worker processes

    # Empty for all the cores

    # Int for number of processes

    with multiprocessing.Pool(maxtasksperchild=5) as pool:


        # Calculate scores concurrently

        # As the iterable is in the order of millions, this value

        # will get continuously large until it uses all available

        # memory as the map stores the results, that in this case

        # it's not needed.

        for score, param in pool.imap_unordered(func, iterable=params, chunksize=10):

            if score > best_score:

                best_score = score

                best_param = param

                # print(best_score)


            # Count the number of objects in the memory

            # If the number of objects keep increasing, it's a memory leak

            print(objgraph.show_most_common_types(limit=20))


    # Wait for all the processes to terminate their tasks

    pool.close()

    pool.join()


    print(best_score)

    print(best_param)



查看完整回答
反對 回復 2023-08-08
  • 2 回答
  • 0 關注
  • 144 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號