2 回答

TA贡献2080条经验 获得超4个赞
下面使用信号量来减慢过度急切的池线程。不是正确的解决方案,因为它不能解决其他问题,例如使用相同池的嵌套循环和循环 imap 的结果在任何内部循环作业开始之前完成其外部循环的作业。但它确实限制了内存使用:
def slowdown(n=16):
s = threading.Semaphore(n)
def inner(it):
for item in it:
s.acquire()
yield item
def outer(it):
for item in it:
s.release()
yield item
return outer, inner
这用于包装pool.imap:
outer, inner = slowdown()
outer(pool.imap(func, inner(candidates)))

TA贡献1805条经验 获得超9个赞
Hoxha 的建议效果很好——谢谢!
@Dan 的问题是,即使是空列表也会占用内存,420 亿个配对在内存中接近 3TB。
这是我的实现:
import more_itertools
import itertools
import multiprocessing as mp
import numpy as np
import scipy
from tqdm import tqdm
n_nodes = np.random.randn(10, 100)
num_combinations = int((int(n_nodes.shape[0]) ** 2) - int(n_nodes.shape[0]) // 2)
cpu_count = 8
cutoff=0.3
def node_combinations(nodes):
return itertools.combinations(list(range(len(nodes))), 2)
def edge_gen(xy_iterator: type(itertools.islice)):
edges = []
for cand in tqdm(xy_iterator, total=num_combinations//cpu_count)
if pearsonr(cand):
edges.append(cand)
def pearsonr(xy: tuple):
correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
if correlation_coefficient >= cutoff:
return True
else:
return False
slices = more_itertools.distribute(cpu_count), node_combinations(n_nodes))
pool = mp.Pool(cpu_count)
results = pool.imap(edge_gen, slices)
pool.close()
pool.join()
添加回答
举报