亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

python過濾器+多處理+迭代器延遲加載

python過濾器+多處理+迭代器延遲加載

慕少森 2022-07-26 21:48:56
我有一個二維數組,它產生一個巨大的(> 300GB)組合列表,所以我想對 itertools.combinations 生成的迭代器進行惰性迭代并并行化這個操作。問題是我需要過濾輸出,而 Multiprocessing 不支持。我現有的解決方法需要將組合列表加載到內存中,由于列表的大小,這也不起作用。n_nodes = np.random.randn(10, 100)cutoff=0.3def node_combinations(nodes):    return itertools.combinations(list(range(len(nodes))), 2)    def pfilter(func, candidates):    return np.asarray([c for c, keep in zip(candidates, pool.map(func, candidates)) if keep])def pearsonr(xy: tuple):    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]    if correlation_coefficient >= cutoff:            return True        else:            return Falseedgelist = pfilter(pearsonr, node_combinations(n_nodes))我正在尋找一種使用帶過濾器而不是映射的多處理對大型迭代器進行惰性評估的方法。
查看完整描述

2 回答

?
犯罪嫌疑人X

TA貢獻2080條經驗 獲得超4個贊

下面使用信號量來減慢過度急切的池線程。不是正確的解決方案,因為它不能解決其他問題,例如使用相同池的嵌套循環和循環 imap 的結果在任何內部循環作業開始之前完成其外部循環的作業。但它確實限制了內存使用:


def slowdown(n=16):

    s = threading.Semaphore(n)

    def inner(it):

        for item in it:

            s.acquire()

            yield item

    def outer(it):

        for item in it:

            s.release()

            yield item

    return outer, inner

這用于包裝pool.imap:


outer, inner = slowdown()

outer(pool.imap(func, inner(candidates)))


查看完整回答
反對 回復 2022-07-26
?
Cats萌萌

TA貢獻1805條經驗 獲得超9個贊

Hoxha 的建議效果很好——謝謝!


@Dan 的問題是,即使是空列表也會占用內存,420 億個配對在內存中接近 3TB。


這是我的實現:


import more_itertools

import itertools

import multiprocessing as mp

import numpy as np

import scipy

from tqdm import tqdm


n_nodes = np.random.randn(10, 100)

num_combinations = int((int(n_nodes.shape[0]) ** 2) - int(n_nodes.shape[0]) // 2)

cpu_count = 8

cutoff=0.3


def node_combinations(nodes):

    return itertools.combinations(list(range(len(nodes))), 2)    


def edge_gen(xy_iterator: type(itertools.islice)):

    edges = []

    for cand in tqdm(xy_iterator, total=num_combinations//cpu_count)

        if pearsonr(cand):

            edges.append(cand)


def pearsonr(xy: tuple):

    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]

    if correlation_coefficient >= cutoff:

            return True

        else:

            return False



slices = more_itertools.distribute(cpu_count), node_combinations(n_nodes))

pool = mp.Pool(cpu_count)

results = pool.imap(edge_gen, slices)

pool.close()

pool.join()


查看完整回答
反對 回復 2022-07-26
  • 2 回答
  • 0 關注
  • 127 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號