为了账号安全,请及时绑定邮箱和手机立即绑定

卡夫卡流分组依据和串联

卡夫卡流分组依据和串联

慕后森 2022-09-21 16:42:23

我有一个接收记录的 Kafka 流,我想根据特定字段连接消息。


流中的消息如下所示:


Key: 2099

Payload{

  email: tom@emample.com

  eventCode: 2099

}

预期输出:


key: 2099

Payload{

    emails: tom@example, bill@acme.com, jane@example.com

}

我可以让溪流运行良好,我只是不确定lamda应该包含什么。


这就是我迄今为止所做的。我不确定我是否应该使用映射,聚合或减少或组合这些操作。


final StreamsBuilder builder = new StreamsBuilder();

KStream<String, Payload> inputStream = builder.stream(INPUT_TOPIC);


inputStream

        .groupByKey()

        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))


                                  // Not sure what to do here …..


}).to (OUTPUT_TOPIC );


查看完整描述

1 回答

?
莫回无

TA贡献1554条经验 获得超7个赞

它可能是这样的东西


inputStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))

.aggregate(PayloadAggr::new, new Aggregator<String, Payload, PayloadAggr>() {

        @Override

        public PayloadAggr apply(String key, Payload newValue, PayloadAggr result) {

            result.setKey(key);

            if(result.getEmails()==null){

                result.setEmails(newValue.getEmail());

            }else{

                result.setEmails(result.getEmails() + "," + newValue.getEmail());

            }

            return result;

        }

    }, .../* You serdes and store */}).toStream().to(OUTPUT_TOPIC);


查看完整回答
反对 回复 2022-09-21

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信