3 回答
TA贡献1786条经验 获得超11个赞
您遇到的是关于闭包、循环和 Python 作用域的令人惊讶的问题。您可以通过分配变量而不是将其从闭包中拉出来解决这个问题。例如
tables = [
(
data
# Pass it as a side input to Filter.
| "Filter by Language" >> beam.Filter(lambda elem, cls: elem["class"], c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for c in CLASSES
]
或者
tables = [
(
data
# Explicitly capture it as a default value in the lambda.
| "Filter by Language" >> beam.Filter(lambda elem, cls=c: elem["class"])
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for c in CLASSES
]
分区在这里也很有效,既可以避免这个陷阱,也可以表达你的意图。
TA贡献2021条经验 获得超8个赞
根据您显示的结果表,我假设您希望输出如下所示:
{'itemid': '1', 'value B': 10.3, 'value A': 0.2}
{'itemid': '2', 'value B': 0.2, 'value A': 3.0}
{'itemid': '3', 'value B': 1.2, 'value A': 0.0, 'value C': 5.4}
您的 mergeDicts 正在覆盖值,因为字典每个键只能保存一个值。将 mergeDicts 更新为类似这样的内容以指定键:
class mergeDicts(beam.DoFn):
process(self, elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result
TA贡献1807条经验 获得超9个赞
我在这里发布一个我自己找到的解决方案,但我没有检查它是否为正确答案,因为我想更好地理解 Beam 引擎的执行逻辑。
为了根据条件获得单独的 pcollection,我没有使用循环中的项目过滤表,而是使用了类beam.Partition
。通过直接应用文档中的代码示例,我将 pcollection 分成多个表,准备加入。
这样就避免了这个问题,但是我不清楚为什么 for 循环不能像我预期的那样工作。
添加回答
举报