为了账号安全,请及时绑定邮箱和手机立即绑定

Apache Beam Go SDK:如何将 PCollection<string> 转换为

Apache Beam Go SDK:如何将 PCollection<string> 转换为

Go
蛊毒传说 2022-11-23 19:49:23
我正在使用 Apache Beam Go SDK 并且很难以正确的格式获取PCollection以按键进行分组/组合。我在 PCollection 的字符串中每个键有多个记录,如下所示:Bob, catBob, dogCarla, catCarla, bunnyDoug, horse我想使用GroupByKey和CombinePerKey,这样我就可以像这样汇总每个人的宠物:Bob, [cat, dog]Carla, [cat, bunny]Doug, [horse]如何将 PCollection<string> 转换为 PCollection<KV<string, string>>?他们在这里提到了类似的东西,但不包括聚合字符串值的代码。我可以使用 ParDo 获取字符串键和字符串值,如下所示,但我不知道如何转换为 GroupPerKey 输入所需的 KV<string, string> 或 CoGBK<string, string> 格式。pcolOut := beam.ParDo(s, func(line string) (string, string) {  cleanString := strings.TrimSpace(line)  openingChar := ","  iStart := strings.Index(cleanString, openingChar)  key := cleanString[0:iStart]  value := cleanString[iStart+1:]        // How to convert to PCollection<KV<string, string>> before returning?  return key, value}, pcolIn)groupedKV := beam.GroupByKey(s, pcolOut) 它失败并出现以下错误。有什么建议么?panic:  inserting ParDo in scope root        creating new DoFn in scope root        binding fn main.main.func2        binding params [{Value string} {Value string}] to input CoGBK<string,string>values of CoGBK<string,string> cannot bind to {Value string}
查看完整描述

1 回答

?
汪汪一只猫

TA贡献1898条经验 获得超8个赞

要映射到 KV,您可以应用 MapElements 并使用 into() 来设置 KV 类型,并在 via() 逻辑中创建一个新KV.of(myKey, myValue)的 ,例如,要获取一个KV<String,String>,请使用以下内容:


    PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(

        TypeDescriptors.kvs(

            TypeDescriptors.strings(),

            TypeDescriptors.strings()))

        .via(

            linkpage -> KV.of(dataFile, linkpage)));


查看完整回答
反对 回复 2022-11-23
  • 1 回答
  • 0 关注
  • 87 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信