受此解决方案的启发,我尝试在 Python 中设置一个多处理工作进程池。这个想法是在工作进程实际开始工作之前将一些数据传递给工作进程并最终重用它。它旨在最小化每次调用到工作进程时需要打包/解包的数据量(即减少进程间通信开销)。我的MCVE看起来像这样:import multiprocessing as mpimport numpy as npdef create_worker_context(): global context # create "global" context in worker process context = {}def init_worker_context(worker_id, some_const_array, DIMS, DTYPE): context.update({ 'worker_id': worker_id, 'some_const_array': some_const_array, 'tmp': np.zeros((DIMS, DIMS), dtype = DTYPE), }) # store context information in global namespace of worker return True # return True, verifying that the worker process received its dataclass data_analysis: def __init__(self): self.DTYPE = 'float32' self.CPU_LEN = mp.cpu_count() self.DIMS = 100 self.some_const_array = np.zeros((self.DIMS, self.DIMS), dtype = self.DTYPE) # Init multiprocessing pool self.cpu_pool = mp.Pool(processes = self.CPU_LEN, initializer = create_worker_context) # create pool and context in workers pool_results = [ self.cpu_pool.apply_async( init_worker_context, args = (core_id, self.some_const_array, self.DIMS, self.DTYPE) ) for core_id in range(self.CPU_LEN) ] # pass information to workers' context result_batches = [result.get() for result in pool_results] # check if they got the information if not all(result_batches): # raise an error if things did not work raise SyntaxError('Workers could not be initialized ...') @staticmethod def process_batch(batch_data): context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker return context['tmp'] # return result我正在使用 CPython 3.6.6 运行上述内容。我很困惑。这里发生了什么?
添加回答
举报
0/150
提交
取消
