为了账号安全,请及时绑定邮箱和手机立即绑定

使用默认架构注册表客户端而不是 Avro 架构注册表客户端的

使用默认架构注册表客户端而不是 Avro 架构注册表客户端的

Smart猫小萌 2021-12-30 20:58:15
我们将 Kafka 与 Spring Cloud Stream 一起使用,我们需要连接到 Spring Boot 组件中的 Confluent Schema Registry,请参阅https://github.com/donalthurley/KafkaConsumeScsAndConfluent。我们添加了以下配置来创建所需的 ConfluentSchemaRegistryClient bean,请参阅https://github.com/donalthurley/KafkaConsumeScsAndConfluent/blob/master/src/main/java/com/example/kafka/KafkaConfig.java这应该覆盖默认架构来自 Spring Cloud Stream 的注册表。但是,在一些部署之后,我们间歇性地看到以下故障。org.springframework.messaging.MessageDeliveryException: failed to send Message to channel根本原因显示此堆栈跟踪Caused by: java.lang.NullPointerException    at     org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:71)    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:238)    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:179)    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191)    at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)    AvroSchmaRegistryClientMessageConverter 正在调用 DefaultSchemaRegistryClient 的事实向我们表明 ConfluentSchemaRegistryClient bean 的接线存在问题。我们的配置中是否还需要其他内容来确保 ConfluentSchemaRegistryClient bean 正确连接?
查看完整描述

2 回答

?
ibeautiful

TA贡献1993条经验 获得超6个赞

它对我有用。这就是我所做的:

  • 完全像你一样使用配置类

  • @EnableSchemaRegistryClient在项目中使用了注解

  • 将 avro 序列化程序添加到类路径: io.confluent:kafka-avro-serializer

  • 设置属性如下:

    spring.cloud.stream.kafka.bindings.channel.consumer.configuration.schema.registry.url=你的注册地址 spring.cloud.stream.kafka.bindings.channel.consumer.configuration.specific.avro.reader=true

其中 channel 对应于您应用中的频道名称。

我认为最后一个属性对于告诉 Spring 使用 Avro 序列化器而不是默认序列化器非常重要。

我使用的是 Spring Cloud Stream Elmhurst.RELEASE,因此如果您使用其他版本,属性的名称可能会略有不同。


查看完整回答
反对 回复 2021-12-30
?
炎炎设计

TA贡献1808条经验 获得超4个赞

我现在已将 @EnableSchemaRegistryClient 注释移至项目应用程序类,请参阅https://github.com/donalthurley/KafkaConsumeScsAndConfluent/commit/b4cf5427d7ab0a4fed619fe54b042890f5ccb594并重新部署,这解决了我在部署环境时遇到的问题。

我一直在用 @EnableSchemaRegistryClient 注释来注释我的生产者和消费者类。

在我所有的本地测试中,这一直在针对我本地的 docker confluent 模式注册表工作。然而,在部署到我们的环境时,它大部分时间都在工作,但在一些部署后偶尔会失败。

我没有成功地在本地复制这个。

我还在本地测试中注意到,如果我删除 Confluent Schema Registry 的配置,我会得到相同的空指针异常堆栈跟踪。

所以我认为我看到的问题是,当项目应用程序类中不存在 @EnableSchemaRegistryClient 注释时,AvroSchemaRegistryClientMessageConverter bean 没有与融合模式注册表 bean 连接。

我不明白为什么那是必要的,但我认为它可能已经解决了这个问题。


查看完整回答
反对 回复 2021-12-30
  • 2 回答
  • 0 关注
  • 241 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号