为了账号安全,请及时绑定邮箱和手机立即绑定

在 Python Apache Beam 中使用 value provider 参数的方法

在 Python Apache Beam 中使用 value provider 参数的方法

慕田峪4524236 2022-06-14 09:56:25
现在我只能使用 ParDo 在类中获取 RunTime 值,是否有另一种方法可以像在我的函数中一样使用运行时参数?这是我现在得到的代码:class UserOptions(PipelineOptions):    @classmethod    def _add_argparse_args(cls, parser):        parser.add_value_provider_argument('--firestore_document',default='')def run(argv=None):    parser = argparse.ArgumentParser()    pipeline_options = PipelineOptions()    user_options = pipeline_options.view_as(UserOptions)    pipeline_options.view_as(SetupOptions).save_main_session = True    with beam.Pipeline(options=pipeline_options) as p:        rows = (p         | 'Create inputs' >> beam.Create([''])         | 'Call Firestore' >> beam.ParDo(                CallFirestore(user_options.firestore_document))         | 'Read DB2' >> beam.Map(ReadDB2))我希望 user_options.firestore_document 可以在其他功能中使用,而无需执行 ParDo
查看完整描述

1 回答

?
LEATH

TA贡献1936条经验 获得超7个赞

您可以使用价值提供者的唯一方法是在 ParDos 和 Combines 中。不可能在创建中传递值提供者,但您可以定义一个 DoFn,它返回您在构造函数中传递给它的值提供者:


class OutputValueProviderFn(beam.DoFn):

  def __init__(self, vp):

    self.vp = vp


  def process(self, unused_elm):

    yield self.vp.get()

在您的管道中,您将执行以下操作:


user_options = pipeline_options.view_as(UserOptions)


with beam.Pipeline(options=pipeline_options) as p:

  my_value_provided_pcoll = (

      p

      | beam.Create([None])

      | beam.ParDo(OutputValueProviderFn(user_options.firestore_document))

这样你就不会在 Create 中使用它,因为这是不可能的,但你仍然可以在 PCollection 中获得它。


查看完整回答
反对 回复 2022-06-14
  • 1 回答
  • 0 关注
  • 98 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号