为了账号安全,请及时绑定邮箱和手机立即绑定

KafkaStreams 如何在流聚合中指定 Serdes?

KafkaStreams 如何在流聚合中指定 Serdes?

红颜莎娜 2022-01-19 10:40:22
我正在开发一个 Kafka 流应用程序,但在弄清楚如何使聚合工作时遇到了一些麻烦。我有一个 KStream bankTransactions,其中键是类型String,值是类型JsonNode,所以我配置了我的应用程序的 Serdes// Definition of the different Serdes used in the streamsfinal Serde<String> stringSerde = Serdes.String();final Serde<JsonNode> jsonSerde = new JsonSerde();final Serde<Long> longSerde = Serdes.Long();config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());我想在KTable<String, Long>键相同但值Long将从我的 Json 中提取的值中聚合值。所以首先我写道:KTable<String, Long> totalBalances = bankTransactions        .groupByKey()        .aggregate(                () -> 0L,                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),                Materialized.as("bank-total-balance")        );我在运行时收到以下错误:Caused by: org.apache.kafka.streams.errors.StreamsException:A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible tothe actual value type (value type: java.lang.Long).Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.我知道 Kafka 抱怨是因为我正在尝试使用默认的 Json serdes 来序列化Long. 所以从confluent的文档中阅读我尝试了这个KTable<String, Long> totalBalances = bankTransactions        .groupByKey()        .aggregate(                () -> 0L,                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),                Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())        );但后来我在编译时遇到错误:Error:(121, 89) java: incompatible types:org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be convertedto org.apache.kafka.common.serialization.Serde<java.lang.Object>我尝试了不同的方式来编写此代码(例如,使用Serdes.long()而不是 my longSerdes,尝试将类型参数化Materialize,甚至尝试将我的初始化程序和聚合器编写为函数,Java 7 风格),但我无法弄清楚我做错了什么。aggregate所以我的问题很简单:当它们不是默认的 Serdes 时,如何正确指定应该使用的 Serdes?
查看完整描述

1 回答

?
倚天杖

TA贡献1828条经验 获得超3个赞

似乎正确的语法如下:


KTable<String, Long> totalBalances = bankTransactions

        .groupByKey()

        .aggregate(

                () -> 0L,

                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),

                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("bank-total-balances")

                        .withKeySerde(stringSerde)

                        .withValueSerde(longSerde)

        );

之后的三种类型Materialize.是键、值和用于物化 KTable 的存储的一种,这一种不应该改变。然后我们可以在这个键值存储中定义用于写入的 Serdes。


请注意,我从 github 上的随机存储库中获得了此语法,我仍然很乐意接受由一些文档支持的更精确答案的答案。


查看完整回答
反对 回复 2022-01-19
  • 1 回答
  • 0 关注
  • 318 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号