2 回答
TA贡献1993条经验 获得超6个赞
它对我有用。这就是我所做的:
完全像你一样使用配置类
@EnableSchemaRegistryClient在项目中使用了注解将 avro 序列化程序添加到类路径:
io.confluent:kafka-avro-serializer设置属性如下:
spring.cloud.stream.kafka.bindings.channel.consumer.configuration.schema.registry.url=你的注册地址 spring.cloud.stream.kafka.bindings.channel.consumer.configuration.specific.avro.reader=true
其中 channel 对应于您应用中的频道名称。
我认为最后一个属性对于告诉 Spring 使用 Avro 序列化器而不是默认序列化器非常重要。
我使用的是 Spring Cloud Stream Elmhurst.RELEASE,因此如果您使用其他版本,属性的名称可能会略有不同。
TA贡献1808条经验 获得超4个赞
我现在已将 @EnableSchemaRegistryClient 注释移至项目应用程序类,请参阅https://github.com/donalthurley/KafkaConsumeScsAndConfluent/commit/b4cf5427d7ab0a4fed619fe54b042890f5ccb594并重新部署,这解决了我在部署环境时遇到的问题。
我一直在用 @EnableSchemaRegistryClient 注释来注释我的生产者和消费者类。
在我所有的本地测试中,这一直在针对我本地的 docker confluent 模式注册表工作。然而,在部署到我们的环境时,它大部分时间都在工作,但在一些部署后偶尔会失败。
我没有成功地在本地复制这个。
我还在本地测试中注意到,如果我删除 Confluent Schema Registry 的配置,我会得到相同的空指针异常堆栈跟踪。
所以我认为我看到的问题是,当项目应用程序类中不存在 @EnableSchemaRegistryClient 注释时,AvroSchemaRegistryClientMessageConverter bean 没有与融合模式注册表 bean 连接。
我不明白为什么那是必要的,但我认为它可能已经解决了这个问题。
添加回答
举报
