2 回答

TA貢獻2080條經驗 獲得超4個贊
下面使用信號量來減慢過度急切的池線程。不是正確的解決方案,因為它不能解決其他問題,例如使用相同池的嵌套循環和循環 imap 的結果在任何內部循環作業開始之前完成其外部循環的作業。但它確實限制了內存使用:
def slowdown(n=16):
s = threading.Semaphore(n)
def inner(it):
for item in it:
s.acquire()
yield item
def outer(it):
for item in it:
s.release()
yield item
return outer, inner
這用于包裝pool.imap:
outer, inner = slowdown()
outer(pool.imap(func, inner(candidates)))

TA貢獻1805條經驗 獲得超9個贊
Hoxha 的建議效果很好——謝謝!
@Dan 的問題是,即使是空列表也會占用內存,420 億個配對在內存中接近 3TB。
這是我的實現:
import more_itertools
import itertools
import multiprocessing as mp
import numpy as np
import scipy
from tqdm import tqdm
n_nodes = np.random.randn(10, 100)
num_combinations = int((int(n_nodes.shape[0]) ** 2) - int(n_nodes.shape[0]) // 2)
cpu_count = 8
cutoff=0.3
def node_combinations(nodes):
return itertools.combinations(list(range(len(nodes))), 2)
def edge_gen(xy_iterator: type(itertools.islice)):
edges = []
for cand in tqdm(xy_iterator, total=num_combinations//cpu_count)
if pearsonr(cand):
edges.append(cand)
def pearsonr(xy: tuple):
correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
if correlation_coefficient >= cutoff:
return True
else:
return False
slices = more_itertools.distribute(cpu_count), node_combinations(n_nodes))
pool = mp.Pool(cpu_count)
results = pool.imap(edge_gen, slices)
pool.close()
pool.join()
添加回答
舉報