为了账号安全,请及时绑定邮箱和手机立即绑定

使用 concurrent.futures 进行并行处理

使用 concurrent.futures 进行并行处理

繁花如伊 2022-07-12 10:00:54
我试图找到一种方法来使用不同的方法对数据帧进行并行处理,如本教程所示:https ://www.youtube.com/watch?v= fKl2JW_qrso (min >18:26)。但结果显示我出了点问题。代码的想法是在数据框中创建一个新列 ['分母'],其中包含“basalareap”、“basalareas”、“basalaread”列中每个字段的行和。任何建议这里有什么问题,我在打印时得到了这个奇怪的结果?此外,还有其他方法可以最有效地进行并行化吗?import pandas as pdimport numpy as npimport concurrent.futuresfrom multiprocessing import cpu_countnp.random.seed(4)layer = pd.DataFrame(np.random.randint(0,25,size=(10, 3)),                  columns=list(['basalareap', 'basalareas', 'basalaread']))def denom():    layer['denominator'] = layer[["basalareap","basalareas","basalaread"]].sum(axis=1)data_split = np.array_split(layer,cpu_count())with concurrent.futures.ProcessPoolExecutor() as executor:    results = [executor.submit(denom) for i in data_split]print(results)>>>print(results)[<Future at 0x1b45e325108 state=finished raised BrokenProcessPool>, <Future at 0x1b45e357708 state=finished raised BrokenProcessPool>, <Future at 0x1b45e3577c8 state=finished raised BrokenProcessPool>, <Future at 0x1b45e357888 state=finished raised BrokenProcessPool>, <Future at 0x1b45e357948 state=finished raised BrokenProcessPool>, <Future at 0x1b45e357a48 state=finished raised BrokenProcessPool>, <Future at 0x1b45e357b08 state=finished raised BrokenProcessPool>, <Future at 0x1b45e357bc8 state=finished raised BrokenProcessPool>]我的系统:Windows 10 python 3.7.4
查看完整描述

1 回答

?
手掌心

TA贡献1942条经验 获得超3个赞

这是一种可以使其工作的方法(使用您的示例数据):


import pandas as pd

import numpy as np

import concurrent.futures as cf

from multiprocessing import cpu_count


np.random.seed(4)

layer = pd.DataFrame(np.random.randint(0,25,size=(10, 3)),

                  columns=list(['basalareap', 'basalareas', 'basalaread']))


def denom(layer):

    layer['denominator'] = layer[["basalareap","basalareas","basalaread"]].sum(axis=1)

    return layer


if __name__ == '__main__':


    data_split = np.array_split(layer,cpu_count())


    # create a function to for process tasks

    def cpu_tasks(func, *args):


        with cf.ProcessPoolExecutor() as tp:

            result = tp.map(func, chunksize=10, *args)

        return list(result)


    # get result

    newdf = cpu_tasks(denom, data_split)


    # convert list to dataframe

    newdf = pd.concat(newdf)

    print(newdf)



       basalareap  basalareas  basalaread  denominator

    0          14          23           5           42

    1           1           8          23           32

    2           8          18           9           35

    3           7          13          23           43

    4          23           8           4           35

    5          18          12           6           36

    6          10          20           3           33

    7           0          23          21           44

    8          21           9           6           36

    9           6          24           2           32


查看完整回答
反对 回复 2022-07-12
  • 1 回答
  • 0 关注
  • 231 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号