为了账号安全,请及时绑定邮箱和手机立即绑定

python过滤器+多处理+迭代器延迟加载

python过滤器+多处理+迭代器延迟加载

慕少森 2022-07-26 21:48:56
我有一个二维数组,它产生一个巨大的(> 300GB)组合列表,所以我想对 itertools.combinations 生成的迭代器进行惰性迭代并并行化这个操作。问题是我需要过滤输出,而 Multiprocessing 不支持。我现有的解决方法需要将组合列表加载到内存中,由于列表的大小,这也不起作用。n_nodes = np.random.randn(10, 100)cutoff=0.3def node_combinations(nodes):    return itertools.combinations(list(range(len(nodes))), 2)    def pfilter(func, candidates):    return np.asarray([c for c, keep in zip(candidates, pool.map(func, candidates)) if keep])def pearsonr(xy: tuple):    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]    if correlation_coefficient >= cutoff:            return True        else:            return Falseedgelist = pfilter(pearsonr, node_combinations(n_nodes))我正在寻找一种使用带过滤器而不是映射的多处理对大型迭代器进行惰性评估的方法。
查看完整描述

2 回答

?
犯罪嫌疑人X

TA贡献2080条经验 获得超4个赞

下面使用信号量来减慢过度急切的池线程。不是正确的解决方案,因为它不能解决其他问题,例如使用相同池的嵌套循环和循环 imap 的结果在任何内部循环作业开始之前完成其外部循环的作业。但它确实限制了内存使用:


def slowdown(n=16):

    s = threading.Semaphore(n)

    def inner(it):

        for item in it:

            s.acquire()

            yield item

    def outer(it):

        for item in it:

            s.release()

            yield item

    return outer, inner

这用于包装pool.imap:


outer, inner = slowdown()

outer(pool.imap(func, inner(candidates)))


查看完整回答
反对 回复 2022-07-26
?
Cats萌萌

TA贡献1805条经验 获得超9个赞

Hoxha 的建议效果很好——谢谢!


@Dan 的问题是,即使是空列表也会占用内存,420 亿个配对在内存中接近 3TB。


这是我的实现:


import more_itertools

import itertools

import multiprocessing as mp

import numpy as np

import scipy

from tqdm import tqdm


n_nodes = np.random.randn(10, 100)

num_combinations = int((int(n_nodes.shape[0]) ** 2) - int(n_nodes.shape[0]) // 2)

cpu_count = 8

cutoff=0.3


def node_combinations(nodes):

    return itertools.combinations(list(range(len(nodes))), 2)    


def edge_gen(xy_iterator: type(itertools.islice)):

    edges = []

    for cand in tqdm(xy_iterator, total=num_combinations//cpu_count)

        if pearsonr(cand):

            edges.append(cand)


def pearsonr(xy: tuple):

    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]

    if correlation_coefficient >= cutoff:

            return True

        else:

            return False



slices = more_itertools.distribute(cpu_count), node_combinations(n_nodes))

pool = mp.Pool(cpu_count)

results = pool.imap(edge_gen, slices)

pool.close()

pool.join()


查看完整回答
反对 回复 2022-07-26
  • 2 回答
  • 0 关注
  • 128 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号