为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Spring Kafka 反序列化来自同一 Kafka 主题的不同 JSON 有效负载

使用 Spring Kafka 反序列化来自同一 Kafka 主题的不同 JSON 有效负载

慕娘9325324 2022-05-21 20:31:53
我正在尝试反序列化来自同一 Kafka 主题的不同 JSON 有效负载。这里提出的其他问题引导我进行了第一次尝试,但我无法让它运行。正如 Gary 提到的(这里)有一些提示(JsonSerializer.ADD_TYPE_INFO_HEADERS),但是当我发送和接收两条消息时,我得到一个异常。org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming messageEndpoint handler details:Method [public void com.foo.message.ConsumerImpl.consumeSelf(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.util.Map<java.lang.String, java.lang.Object>,com.foo.message.KafkaMessage,org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)]Bean [com.foo.message.ConsumerImpl@6df2a206]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.foo.message.KafkaMessageWithAdditionalField] to [com.foo.message.KafkaMessage] for GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers={kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481}], failedMessage=GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers={kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481}]...
查看完整描述

1 回答

?
绝地无双

TA贡献1946条经验 获得超4个赞

你不能那样做;您有 2 个不同的侦听器容器,其中的侦听器期望不同的对象。


对于接收不同类型的多个监听器方法,需要@KafkaListener在类级别和@KafkaHandler方法级别使用。


请参阅Class 上的@KafkaListener。


在类级别使用@KafkaListener 时,您在方法级别指定@KafkaHandler。传递消息时,转换后的消息负载类型用于确定调用哪个方法。


@KafkaListener(id = "multi", topics = "myTopic")

static class MultiListenerBean {


    @KafkaHandler

    public void listen(String foo) {

        ...

    }


    @KafkaHandler

    public void listen(Integer bar) {

        ...

    }


    @KafkaHandler(isDefault = true`)

    public void listenDefault(Object object) {

        ...

    }


}

默认方法是可选的,用于未知的有效负载类型。


但这仅适用于智能反序列化器(知道如何转换为不同的有效负载)。


或者,您可以RecordFilterStrategy向侦听器容器工厂添加一个以跳过每个侦听器中的其他记录。


查看完整回答
反对 回复 2022-05-21
  • 1 回答
  • 0 关注
  • 207 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号