为了账号安全,请及时绑定邮箱和手机立即绑定

将任务添加到父级的多处理池

将任务添加到父级的多处理池

紫衣仙女 2023-06-06 17:27:45
如何将新任务添加到multiprocessing在父进程中初始化的池中?以下不起作用:from multiprocessing import Pooldef child_task(x):    # the child task spawns new tasks    results = p.map(grandchild_task, [x])    return results[0]def grandchild_task(x):    return xif __name__ == '__main__':    p = Pool(2)    print(p.map(child_task, [0]))    # Result: NameError: name 'p' is not defined动机:我需要并行化一个由各种子任务组成的程序,这些子任务本身也有子任务(即孙任务)。仅并行化子任务或孙任务不会利用我所有的 CPU 内核。在我的用例中,我有各种子任务(可能有 1-50 个),每个子任务有很多孙子任务(可能有 100-1000 个)。替代方案:如果使用 Python 的多处理包无法做到这一点,我很乐意切换到另一个支持它的库。
查看完整描述

2 回答

?
鸿蒙传说

TA贡献1865条经验 获得超7个赞

有一个最小的可重现示例这样的东西,然后除此之外还删除了太多代码,最终得到的东西 (1) 可能过于简单化了,有危险而不是答案可能会错过标记,并且 (2)不可能如图所示运行(您需要将创建 Pool 和提交任务的代码包含在由语句控制的块中if __name__ == '__main__':。


但是根据您所展示的内容,我不认为 Pool 是适合您的解决方案;您应该根据需要创建 Process 实例。从进程中获取结果的一种方法是将它们存储在可共享的托管字典中,其键例如是创建结果的进程的进程 ID。


为了扩展您的示例,向子任务传递了两个参数,x并且y需要作为结果返回x**2 + 'y**2。子任务将产生两个孙任务实例,每个实例计算其参数的平方。然后,子任务将使用加法组合这些进程的返回值:


from multiprocessing import Process, Manager

import os



def child_task(results_dict, x, y):

    # the child task spawns new tasks

    p1 = Process(target=grandchild_task, args=(results_dict, x))

    p1.start()

    pid1 = p1.pid

    p2 = Process(target=grandchild_task, args=(results_dict, y))

    p2.start()

    pid2 = p2.pid

    p1.join()

    p2.join()

    pid = os.getpid()

    results_dict[pid] = results_dict[pid1] + results_dict[pid2]




def grandchild_task(results_dict, n):

    pid = os.getpid()

    results_dict[pid] = n * n



def main():

    manager = Manager()

    results_dict = manager.dict()

    p = Process(target=child_task, args=(results_dict, 2, 3))

    p.start()

    pid = p.pid

    p.join()

    # results will be stored with key p.pid:

    print(results_dict[pid])


if __name__ == '__main__':

    main()

印刷:


13

更新


例如,如果您确实遇到这样一种情况,child_task需要处理 N 个相同的调用,只是参数不同,但它必须产生一两个子进程,那么像以前一样使用 Pool,但另外传递一个要使用的托管child_task字典用于产生额外的进程(不尝试为此使用池)并检索它们的结果。


更新 2


我能弄清楚子进程本身使用池的唯一方法是使用模块ProcessPoolExecutor中的类concurrent.futures。当我试图用 做同样的事情时multiprocessing.Pool,我得到了一个错误,因为我们有守护进程试图创建自己的进程。但即使在这里,唯一的方法是池中的每个进程都有自己的进程池。您的计算机上只有有限数量的处理器/内核,因此除非在处理中混合了一些 I/O,否则您可以创建所有这些池,但进程将等待运行的机会。因此,尚不清楚将实现什么样的性能提升。还有关闭为child_task子进程创建的所有池的问题。通常一个ProcessPoolExecutor实例是使用with块,当该块终止时,将清理创建的池。但是child_task被重复调用并且显然不能使用with块,因为我们不希望不断地创建和销毁池。我来到这里有点麻烦:传递了第三个参数,True 或 False,指示是否child_task应该启动其池的关闭。此参数的默认值为 False,我们甚至懒得传递它。在检索到所有实际结果并且child_task进程现在空闲之后,我们提交 N 个具有虚拟值但shutdown设置为 True 的新任务。请注意,该ProcessPoolExecutor函数的map工作方式与类中的相同函数有很大不同Pool(阅读文档):


from concurrent.futures import ProcessPoolExecutor

import time



child_executor = None



def child_task(x, y, shutdown=False):

    global child_executor


    if child_executor is None:

        child_executor = ProcessPoolExecutor(max_workers=1)

    if shutdown:

        if child_executor:

            child_executor.shutdown(False)

            child_executor = None

            time.sleep(.2) # make sure another process in the pool gets the next task

        return None

    # the child task spawns new task(s)

    future = child_executor.submit(grandchild_task, y)

    # we can compute one of the results using the current process:

    return grandchild_task(x) + future.result()



def grandchild_task(n):

    return n * n



def main():

    N_WORKERS = 2

    with ProcessPoolExecutor(max_workers=N_WORKERS) as executor:

        # first call is (1, 2), second call is (3, 4):

        results = [result for result in executor.map(child_task, (1, 3), (2, 4))]

        print(results)

        # force a shutdown

        # need N_WORKERS invocations:

        [result for result in executor.map(child_task, (0,) * N_WORKERS, (0,) * N_WORKERS, (True,) * N_WORKERS)]



if __name__ == '__main__':

    main()

印刷:


[5, 25]


查看完整回答
反对 回复 2023-06-06
?
holdtom

TA贡献1805条经验 获得超10个赞

检查此解决方案:


#!/usr/bin/python

# requires Python version 3.8 or higher


from multiprocessing import Queue, Process

import time

from random import randrange

import os

import psutil



# function to be run by each child process

def square(number):

    sleep = randrange(5)

    time.sleep(sleep)

    print(f'Result is {number * number}, computed by pid {os.getpid()}...sleeping {sleep} secs')



# create a queue where all tasks will be placed

queue = Queue()


# indicate how many number of children you want the system to create to run the tasks

number_of_child_proceses = 5


# put all tasks in the queue above

for task in range(19):

    queue.put(task)



# this the main entry/start of the program when you run

def main():

    number_of_task = queue.qsize()

    print(f'{"_" * 60}\nBatch: {number_of_task // number_of_child_proceses + 1} \n{"_" * 60}')


    # don't create more number of children than the number of tasks. Also, in the last round, wait for all child process

    # to complete so as to wrap up everything

    if number_of_task <= number_of_child_proceses:

        processes = [Process(target=square, args=(queue.get(),)) for _ in

                     range(number_of_task)]

        for p in processes:

            p.start()

            p.join()


    else:

        processes = [Process(target=square, args=(queue.get(),)) for _ in range(number_of_child_proceses)]

        for p in processes:

            p.start()


    # update count of remaining task

    number_of_task = queue.qsize()


    # run the program in a loop until no more task remains in the queue

    while number_of_task:

        current_process = psutil.Process()

        children = current_process.children()


        # if children process have completed assigned task but there is still more remaining tasks in the queue,

        # assign them more tasks

        if not len(children) and number_of_task:

            print(f'\nAssigned tasks completed... reasigning the remaining {number_of_task} task(s) in the queue\n')

            main()


    # exit the loop if no more task in the queue to work on


    print('\nAll tasks completed!!')

    exit()



if __name__ == "__main__":

    main()



查看完整回答
反对 回复 2023-06-06
?
梵蒂冈之花

TA贡献1900条经验 获得超5个赞

我环顾四周,找到了Ray,它使用嵌套的远程函数解决了这个确切的用例。



查看完整回答
反对 回复 2023-06-06
  • 2 回答
  • 0 关注
  • 99 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信