为了账号安全,请及时绑定邮箱和手机立即绑定

在 Python 中多处理一个文件,然后将结果写入磁盘

在 Python 中多处理一个文件,然后将结果写入磁盘

万千封印 2021-12-16 16:30:58
我想做以下事情:从 csv 文件中读取数据处理上述 csv 的每一行(假设这是一个长网络操作)将结果写入另一个文件我曾尝试将这个和这个答案粘合在一起,但收效甚微。第二个队列的代码永远不会被调用,因此不会发生写入磁盘的情况。我如何让进程知道有第二个队列?请注意,我不一定是multiprocessing. 如果async/await效果更好,我完全赞成。到目前为止我的代码
查看完整描述

2 回答

?
RISEBY

TA贡献1856条经验 获得超5个赞

import multiprocessing

import os

import time


in_queue = multiprocessing.Queue()

out_queue = multiprocessing.Queue()


def worker_main(in_queue, out_queue):

    print (os.getpid(), "working")

    while True:

        item = in_queue.get(True)

        print (os.getpid(), "got", item)

        time.sleep(1) #long network processing

        print (os.getpid(), "done", item)

        # put the processed items to be written to disl

        out_queue.put("processed:" + str(item))



pool = multiprocessing.Pool(3, worker_main,(in_queue,out_queue))


for i in range(5): # let's assume this is the file reading part

    in_queue.put(i)


with open('out.txt', 'w') as file:


    while not out_queue.empty():

        try:

            value = q.get(timeout = 1)

            file.write(value + '\n')

        except Exception as qe:

            print ("Empty Queue or dead process")


查看完整回答
反对 回复 2021-12-16
?
宝慕林4294392

TA贡献2021条经验 获得超8个赞

我在尝试执行您的代码时遇到的第一个问题是:


An attempt has been made to start a new process before the current process has finished 

its bootstrapping phase. This probably means that you are not using fork to start your 

child processes and you have forgotten to use the proper idiom in the main module

我必须在if __name__ == '__main__':习语中包装任何模块范围指令。在这里阅读更多。


由于您的目标是遍历文件的行,因此Pool.imap()似乎很合适。该imap()文档参考map()文档,不同的是imap()懒洋洋地提取从可迭代的下一个项目(在你的情况将是CSV文件),这将是有益的,如果您的CSV文件较大。所以从map()文档:


此方法将可迭代对象分成多个块,并将其作为单独的任务提交给进程池。


imap() 返回一个迭代器,这样您就可以遍历流程工作人员产生的结果以执行您必须对它们执行的操作(在您的示例中,它将结果写入文件)。


这是一个工作示例:


import multiprocessing

import os

import time



def worker_main(item):

    print(os.getpid(), "got", item)

    time.sleep(1) #long network processing

    print(os.getpid(), "done", item)

    # put the processed items to be written to disl

    return "processed:" + str(item)



if __name__ == '__main__':

    with multiprocessing.Pool(3) as pool:

        with open('out.txt', 'w') as file:

            # range(5) simulating a 5 row csv file.

            for proc_row in pool.imap(worker_main, range(5)):

                file.write(proc_row + '\n')


# printed output:

# 1368 got 0

# 9228 got 1

# 12632 got 2

# 1368 done 0

# 1368 got 3

# 9228 done 1

# 9228 got 4

# 12632 done 2

# 1368 done 3

# 9228 done 4

out.txt 看起来像这样:


processed:0

processed:1

processed:2

processed:3

processed:4

请注意,我也不必使用任何队列。


查看完整回答
反对 回复 2021-12-16
  • 2 回答
  • 0 关注
  • 253 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号