2 回答
TA贡献1856条经验 获得超5个赞
import multiprocessing
import os
import time
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
def worker_main(in_queue, out_queue):
print (os.getpid(), "working")
while True:
item = in_queue.get(True)
print (os.getpid(), "got", item)
time.sleep(1) #long network processing
print (os.getpid(), "done", item)
# put the processed items to be written to disl
out_queue.put("processed:" + str(item))
pool = multiprocessing.Pool(3, worker_main,(in_queue,out_queue))
for i in range(5): # let's assume this is the file reading part
in_queue.put(i)
with open('out.txt', 'w') as file:
while not out_queue.empty():
try:
value = q.get(timeout = 1)
file.write(value + '\n')
except Exception as qe:
print ("Empty Queue or dead process")
TA贡献2021条经验 获得超8个赞
我在尝试执行您的代码时遇到的第一个问题是:
An attempt has been made to start a new process before the current process has finished
its bootstrapping phase. This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom in the main module
我必须在if __name__ == '__main__':习语中包装任何模块范围指令。在这里阅读更多。
由于您的目标是遍历文件的行,因此Pool.imap()似乎很合适。该imap()文档参考map()文档,不同的是imap()懒洋洋地提取从可迭代的下一个项目(在你的情况将是CSV文件),这将是有益的,如果您的CSV文件较大。所以从map()文档:
此方法将可迭代对象分成多个块,并将其作为单独的任务提交给进程池。
imap() 返回一个迭代器,这样您就可以遍历流程工作人员产生的结果以执行您必须对它们执行的操作(在您的示例中,它将结果写入文件)。
这是一个工作示例:
import multiprocessing
import os
import time
def worker_main(item):
print(os.getpid(), "got", item)
time.sleep(1) #long network processing
print(os.getpid(), "done", item)
# put the processed items to be written to disl
return "processed:" + str(item)
if __name__ == '__main__':
with multiprocessing.Pool(3) as pool:
with open('out.txt', 'w') as file:
# range(5) simulating a 5 row csv file.
for proc_row in pool.imap(worker_main, range(5)):
file.write(proc_row + '\n')
# printed output:
# 1368 got 0
# 9228 got 1
# 12632 got 2
# 1368 done 0
# 1368 got 3
# 9228 done 1
# 9228 got 4
# 12632 done 2
# 1368 done 3
# 9228 done 4
out.txt 看起来像这样:
processed:0
processed:1
processed:2
processed:3
processed:4
请注意,我也不必使用任何队列。
添加回答
举报
