为了账号安全,请及时绑定邮箱和手机立即绑定

使用循环分支 Apache Beam 管道

使用循环分支 Apache Beam 管道

喵喔喔 2023-06-20 17:27:24
我正在尝试执行去规范化操作,我需要使用以下逻辑重新组织表:| itemid | class | value |+--------+-------+-------+| 1      | A     | 0.2   |       | itemid | value A | value B | value C || 1      | B     | 10.3  |  ==>  +--------+---------+---------+---------+| 2      | A     | 3.0   |  ==>  | 1      |   0.2   |  10.3   |         || 2      | B     | 0.2   |  ==>  | 2      |   3.0   |   0.2   |         || 3      | A     | 0.0   |       | 3      |   0.0   |   1.2   |   5.4   | | 3      | B     | 1.2   |  | 3      | C     | 5.4   |      我的方法是执行一个 for 循环以按 进行过滤class,前提是我知道先验类列表,然后加入生成的 pcollection。高级代码:CLASSES = ["A", "B", "C"]tables = [      (        data        | "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)        | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))    )    for cin CLASSES]和加入:_ = (     tables    | "Flatten" >> beam.Flatten()    | "Join Collections" >> beam.GroupByKey()    | "Remove key" >> beam.MapTuple(lambda _, val: val)    | "Merge dicts" >> beam.ParDo(mergeDicts())    | "Write to GCS" >> beam.io.WriteToText(output_file))与(根据 Peter Kim 的建议进行编辑):class mergeDicts(beam.DoFn):    process(self, elements):        result = {}        for dictionary in elements:            if len(dictionary)>0:                result["itemid"] = dictionary["itemid"]                result["value {}".format(dictionary["class"])] = dictionary["value"]        yield result我这里的问题是,当管道在 Apache Beam 计算引擎中执行时,我获得了由列表的最后一个元素过滤的相同 pcollections,在本例中是 C。[已添加] 看起来 Apache Beam 引擎采用最终状态的迭代变量,这意味着迭代列表的最后一个元素,用于所有调用的分支。我显然采用了错误的方法,但是哪种方法应该是执行此操作的最佳方法?
查看完整描述

3 回答

?
Qyouu

TA贡献1786条经验 获得超11个赞

您遇到的是关于闭包、循环和 Python 作用域的令人惊讶的问题。您可以通过分配变量而不是将其从闭包中拉出来解决这个问题。例如

tables = [  

    (

        data

        # Pass it as a side input to Filter.

        | "Filter by Language" >> beam.Filter(lambda elem, cls: elem["class"], c)

        | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))

    )

    for c in CLASSES

]

或者


tables = [  

    (

        data

        # Explicitly capture it as a default value in the lambda.

        | "Filter by Language" >> beam.Filter(lambda elem, cls=c: elem["class"])

        | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))

    )

    for c in CLASSES

]

分区在这里也很有效,既可以避免这个陷阱,也可以表达你的意图。


查看完整回答
反对 回复 2023-06-20
?
宝慕林4294392

TA贡献2021条经验 获得超8个赞

根据您显示的结果表,我假设您希望输出如下所示:


{'itemid': '1', 'value B': 10.3, 'value A': 0.2}

{'itemid': '2', 'value B': 0.2, 'value A': 3.0}

{'itemid': '3', 'value B': 1.2, 'value A': 0.0, 'value C': 5.4}

您的 mergeDicts 正在覆盖值,因为字典每个键只能保存一个值。将 mergeDicts 更新为类似这样的内容以指定键:


class mergeDicts(beam.DoFn):

    process(self, elements):

        result = {}

        for dictionary in elements:

            if len(dictionary)>0:

                result["itemid"] = dictionary["itemid"]

                result["value {}".format(dictionary["class"])] = dictionary["value"]

        yield result


查看完整回答
反对 回复 2023-06-20
?
函数式编程

TA贡献1807条经验 获得超9个赞

我在这里发布一个我自己找到的解决方案,但我没有检查它是否为正确答案,因为我想更好地理解 Beam 引擎的执行逻辑。

为了根据条件获得单独的 pcollection,我没有使用循环中的项目过滤表,而是使用了类beam.Partition。通过直接应用文档中的代码示例,我将 pcollection 分成多个表,准备加入。

这样就避免了这个问题,但是我不清楚为什么 for 循环不能像我预期的那样工作。


查看完整回答
反对 回复 2023-06-20
  • 3 回答
  • 0 关注
  • 118 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信