为了账号安全,请及时绑定邮箱和手机立即绑定

控制 Apache Beam Dataflow 管道中的并行性

控制 Apache Beam Dataflow 管道中的并行性

白板的微信 2022-08-01 15:09:37
我们正在试验Apache Beam(使用Go SDK)和Dataflow来并行化我们耗时的任务之一。为了更多的上下文,我们有缓存作业,它接受一些查询,跨数据库运行它并缓存它们。每个数据库查询可能需要几秒钟到几分钟的时间,我们希望并行运行这些查询以更快地完成任务。创建了一个简单的管道,如下所示:    // Create initial PCollection.    startLoad := beam.Create(s, "InitialLoadToStartPipeline")    // Emits a unit of work along with query and date range.    cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)    // Emits a cache response which includes errCode, errMsg, time etc.    cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)    ...排放的数量单位不是很多,在生产中将主要以数百个和最多几千个为单位。getCachePayloadsFn现在的问题是没有并行执行,查询是逐个按顺序执行的。我们通过在缓存函数中输入日志并记录goroutine id,进程ID,开始和结束时间等来确认这一点,以确认执行中没有重叠。cacheQueryDoFnStartBundleProcessElement我们希望始终并行运行查询,即使只有 10 个查询。根据我们的理解和文档,它从整体输入创建捆绑包,这些捆绑包并行运行,并且在捆绑包中按顺序运行。有没有办法控制来自负载的捆绑包的数量,或者有没有办法增加并行度?我们尝试过的事情:保留 和 。它启动两个 VM,但运行方法以仅在一个 VM 上初始化 DoFn,并将其用于整个负载。num_workers=2autoscaling_algorithm=NoneSetup在此处找到选项。但不知道如何正确设置它。已尝试使用 设置它。无效果。sdk_worker_parallelismbeam.PipelineOptions.Set("sdk_worker_parallelism", "50")
查看完整描述

1 回答

?
富国沪深

TA贡献1790条经验 获得超9个赞

默认情况下,Create 不是并行的,并且所有 DoFn 都融合到与 Create 相同的阶段中,因此它们也没有并行性。有关此内容的详细信息,请参阅 https://beam.apache.org/documentation/runtime/model/#dependent-parallellism

您可以使用重新洗牌变换显式强制融合中断。


查看完整回答
反对 回复 2022-08-01
  • 1 回答
  • 0 关注
  • 168 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号