为了账号安全,请及时绑定邮箱和手机立即绑定

Apache Beam Go SDK:如何将 PCollection<string> 转换为

Apache Beam Go SDK:如何将 PCollection<string> 转换为

Go
蛊毒传说 2022-11-23 19:49:23

我正在使用 Apache Beam Go SDK 并且很难以正确的格式获取PCollection以按键进行分组/组合。


我在 PCollection 的字符串中每个键有多个记录,如下所示:


Bob, cat

Bob, dog

Carla, cat

Carla, bunny

Doug, horse

我想使用GroupByKey和CombinePerKey,这样我就可以像这样汇总每个人的宠物:


Bob, [cat, dog]

Carla, [cat, bunny]

Doug, [horse]

如何将 PCollection<string> 转换为 PCollection<KV<string, string>>?


他们在这里提到了类似的东西,但不包括聚合字符串值的代码。


我可以使用 ParDo 获取字符串键和字符串值,如下所示,但我不知道如何转换为 GroupPerKey 输入所需的 KV<string, string> 或 CoGBK<string, string> 格式。


pcolOut := beam.ParDo(s, func(line string) (string, string) {

  cleanString := strings.TrimSpace(line)

  openingChar := ","

  iStart := strings.Index(cleanString, openingChar)

  key := cleanString[0:iStart]

  value := cleanString[iStart+1:]

        

// How to convert to PCollection<KV<string, string>> before returning?

  return key, value

}, pcolIn)


groupedKV := beam.GroupByKey(s, pcolOut) 

它失败并出现以下错误。有什么建议么?


panic:  inserting ParDo in scope root

        creating new DoFn in scope root

        binding fn main.main.func2

        binding params [{Value string} {Value string}] to input CoGBK<string,string>

values of CoGBK<string,string> cannot bind to {Value string}


查看完整描述

1 回答

?
汪汪一只猫

TA贡献1595条经验 获得超8个赞

要映射到 KV,您可以应用 MapElements 并使用 into() 来设置 KV 类型,并在 via() 逻辑中创建一个新KV.of(myKey, myValue)的 ,例如,要获取一个KV<String,String>,请使用以下内容:


    PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(

        TypeDescriptors.kvs(

            TypeDescriptors.strings(),

            TypeDescriptors.strings()))

        .via(

            linkpage -> KV.of(dataFile, linkpage)));


查看完整回答
反对 回复 2022-11-23

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信