为了账号安全,请及时绑定邮箱和手机立即绑定

处理多个进程中的单个文件

处理多个进程中的单个文件

慕仙森 2019-12-11 14:08:56
我有一个大文本文件,我想在其中处理每一行(执行一些操作)并将它们存储在数据库中。由于一个简单的程序花费的时间太长,我希望它可以通过多个进程或线程来完成。每个线程/进程应从单个文件中读取不同的数据(不同的行),并对它们的数据(行)进行一些操作,然后将它们放入数据库中,以便最后我可以处理所有数据,数据库随我需要的数据一起转储了。但是我无法弄清楚如何解决这个问题。
查看完整描述

3 回答

?
一只斗牛犬

TA贡献1784条经验 获得超2个赞

您正在寻找的是生产者/消费者模式


基本线程示例


这是使用线程模块的基本示例(而不是多处理)


import threading

import Queue

import sys


def do_work(in_queue, out_queue):

    while True:

        item = in_queue.get()

        # process

        result = item

        out_queue.put(result)

        in_queue.task_done()


if __name__ == "__main__":

    work = Queue.Queue()

    results = Queue.Queue()

    total = 20


    # start for workers

    for i in xrange(4):

        t = threading.Thread(target=do_work, args=(work, results))

        t.daemon = True

        t.start()


    # produce data

    for i in xrange(total):

        work.put(i)


    work.join()


    # get the results

    for i in xrange(total):

        print results.get()


    sys.exit()

您不会与线程共享文件对象。您可以通过为队列提供数据行来为他们工作。然后,每个线程将拾取一条线,对其进行处理,然后将其返回到队列中。


多处理模块中内置了一些更高级的功能,可以共享数据,例如列表和特殊的Queue。在使用多处理与线程时需要权衡取舍,这取决于您的工作是CPU约束还是IO约束。


基本的多处理池示例


这是一个多处理池的基本示例


from multiprocessing import Pool


def process_line(line):

    return "FOO: %s" % line


if __name__ == "__main__":

    pool = Pool(4)

    with open('file.txt') as source_file:

        # chunk the work into batches of 4 lines at a time

        results = pool.map(process_line, source_file, 4)


    print results

池是管理其自身进程的便捷对象。由于打开的文件可以遍历其行,因此可以将其传递到pool.map(),该文件将循环遍历并将行传递给worker函数。映射将阻止并在完成后返回整个结果。请注意,这是一个过于简化的示例,pool.map()在进行工作之前,它将立即将整个文件读入内存。如果您希望有大文件,请记住这一点。有更多高级方法可以设计生产者/消费者设置。


手动“池”,具有限制和行重新排序


这是Pool.map的手动示例,但是您可以设置队列大小,以使您仅以其可以处理的最快速度逐个喂入,而不是一次性消耗整个可迭代对象。我还添加了行号,以便以后可以跟踪它们并引用它们。


from multiprocessing import Process, Manager

import time

import itertools 


def do_work(in_queue, out_list):

    while True:

        item = in_queue.get()

        line_no, line = item


        # exit signal 

        if line == None:

            return


        # fake work

        time.sleep(.5)

        result = (line_no, line)


        out_list.append(result)



if __name__ == "__main__":

    num_workers = 4


    manager = Manager()

    results = manager.list()

    work = manager.Queue(num_workers)


    # start for workers    

    pool = []

    for i in xrange(num_workers):

        p = Process(target=do_work, args=(work, results))

        p.start()

        pool.append(p)


    # produce data

    with open("source.txt") as f:

        iters = itertools.chain(f, (None,)*num_workers)

        for num_and_line in enumerate(iters):

            work.put(num_and_line)


    for p in pool:

        p.join()


    # get the results

    # example:  [(1, "foo"), (10, "bar"), (0, "start")]

    print sorted(results)



查看完整回答
反对 回复 2019-12-12
?
绝地无双

TA贡献1946条经验 获得超4个赞

最好将一个大文件分成多个较小的文件,并在单独的线程中进行处理。

查看完整回答
反对 回复 2019-12-12
  • 3 回答
  • 0 关注
  • 277 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信