为了账号安全,请及时绑定邮箱和手机立即绑定

Python 使用多进程加速合并计数器

Python 使用多进程加速合并计数器

喵喔喔 2022-11-01 16:47:06
我正在尝试使用一起购买的物品的次数来制作一个非常简单的物品推荐系统,所以首先我创建了一个像计数器一样的 item2item 字典# people purchased A with B 4 times, A with C 3 times.item2item = {'A': {'B': 4, 'C': 3}, 'B': {'A': 4, 'C': 2}, 'C':{'A': 3, 'B': 2}}# recommend user who purchased A and Csamples_list = [['A', 'C'], ...]    因此,对于 samples = ['A', 'C'],我建议最大 item2item['A'] + item2item['C']。但是,对于大型矩阵,合并很重,所以我尝试使用如下的多处理from operator import addfrom functools import reducefrom concurrent.futures import ProcessPoolExecutorfrom collections import Counterwith ProcessPoolExecutor(max_workers=10) as pool:    for samples in samples_list:        # w/o PoolExecutor        # combined = reduce(add, [item2item[s] for s in samples], Counter())        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())        combined = future.result()然而,这根本没有加快这个过程。我怀疑在Python multiprocessing 和 shared counter和https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes中,reduce 函数中的 Counter 未共享。任何帮助表示赞赏。
查看完整描述

1 回答

?
收到一只叮咚

TA贡献1821条经验 获得超5个赞

调用combined = future.result()会阻塞,直到结果完成,因此您不会在前一个请求完成之前向池提交后续请求。换句话说,您永远不会运行多个子进程。至少您应该将代码更改为:


with ProcessPoolExecutor(max_workers=10) as pool:

    the_futures = []

    for samples in tqdm(sample_list):

        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())

        the_futures.append(future) # save it

    results = [f.result() for f in the_futures()] # all the results

另一种方式:


with ProcessPoolExecutor(max_workers=10) as pool:

    the_futures = []

    for samples in tqdm(sample_list):

        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())

        the_futures.append(future) # save it

    # you need: from concurrent.futures import as_completed

    for future in as_completed(the_futures): # not necessarily the order of submission

        result = future.result() # do something with this

此外,如果您未指定构造函数,则默认为您机器上的处理器数量max_workers。ProcessPoolExecutor指定一个大于您实际拥有的处理器数量的值不会有任何收获。


更新


如果您想在结果完成后立即处理结果并需要一种方法将结果与原始请求联系起来,您可以将期货作为键存储在字典中,其中相应的值表示请求的参数。在这种情况下:


with ProcessPoolExecutor(max_workers=10) as pool:

    the_futures = {}

    for samples in tqdm(sample_list):

        future = pool.submit(reduce, add, [item2item[s] for s in samples], Counter())

        the_futures[future] = samples # map future to request

    # you need: from concurrent.futures import as_completed

    for future in as_completed(the_futures): # not necessarily the order of submission

        samples = the_futures[future] # the request

        result = future.result() # the result


查看完整回答
反对 回复 2022-11-01
  • 1 回答
  • 0 关注
  • 178 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号