为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Apach Beam 执行集差

使用 Apach Beam 执行集差

回首忆惘然 2023-05-23 10:13:23
我有两个列表a,b它们之间有一些共同的元素,我想找到那些共同的元素及其数量,为此我编写了以下程序。import functoolsimport apache_beam as beamfrom apache_beam.runners.interactive.interactive_runner import InteractiveRunnerfrom apache_beam.runners.direct.direct_runner import DirectRunnerfrom apache_beam.options.pipeline_options import PipelineOptionsoptions = PipelineOptions()p = beam.Pipeline(InteractiveRunner(underlying_runner=DirectRunner()), options=options)def form_pair(element, side_input):  for i,e in enumerate(side_input):    if e == element:      return i,ea = ['a','b', 'c', 'c', 'b']b = ['a','a','b', 'c', 'b', 'b','d', 'e', 'f']x0 = p | "0" >> beam.Create(a) | "1" >> beam.Distinct()x1 = beam.pvalue.AsList(x0)x3 = p | "2" >> beam.Create(b)x4 = x3 | "3" >> beam.Map(functools.partial(form_pair, side_input=x1))x5 = x4 | "4" >> beam.combiners.Count.PerKey()r = p.run().wait_until_finish()print(r.get(x5))这给了我以下错误TypeError: 'AsList' object is not iterable [while running '3']
查看完整描述

1 回答

?
尚方宝剑之说

TA贡献1788条经验 获得超4个赞

我传递的侧面输入功能beam.Map不正确这是正确的方法

x4 = x3 | "3" >> beam.Map(form_pair, x1)而不是x4 = x3 | "3" >> beam.Map(functools.partial(form_pair, side_input=x1))which 是错误的。


查看完整回答
反对 回复 2023-05-23
  • 1 回答
  • 0 关注
  • 67 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信