为了账号安全,请及时绑定邮箱和手机立即绑定

弹簧靴嵌入式卡夫卡无法连接

弹簧靴嵌入式卡夫卡无法连接

偶然的你 2022-09-21 16:44:33
我正在尝试为我的 Kafka 使用者编写集成测试。我已经遵循了官方参考文档,但是当我开始测试时,我只看到这个重复的广告无限期:-2019-04-03 15:47:34.002 WARN 13120 --- [主] 组织.apache.kafka.clients.Network客户端 : [消费者客户端 Id=消费者-1,groupId=我的-组] 无法建立与节点 -1 的连接。经纪人可能不可用。我做错了什么?我正在使用 JUnit5、弹簧靴和 .spring-kafkaspring-kafka-test我有我的班级的注释。@EnableKafka@Configuration我的测试类是这样的:@ExtendWith(SpringExtension::class)@SpringBootTest(classes = [TestKafkaConfig::class])@DirtiesContext@EmbeddedKafka(        partitions = 1,        topics = [KafkaIntegrationTest.MY_TOPIC])class KafkaIntegrationTest {    @Autowired    private lateinit var embeddedKafka: EmbeddedKafkaBroker    @Test    fun test() {        val senderProps = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString)        val template = KafkaTemplate(DefaultKafkaProducerFactory<Int, String>(senderProps))        template.defaultTopic = KafkaIntegrationTest.MY_TOPIC        template.sendDefault("foo")    }}我看起来像这样:application.ymlkafka:  consumer:    group-id: my-group    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}    value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer    properties:      schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}      specific.avro.reader: true我也尝试过设置一个,但我得到完全相同的重复消息。(这就是我尝试设置的方式):MockSchemaRegistryClientMockSchemaRegistryClient@TestConfiguration@Import(TestConfig::class)class TestKafkaConfig {    @Autowired    private lateinit var props: KafkaProperties    @Bean    fun schemaRegistryClient() = MockSchemaRegistryClient()    @Bean    fun kafkaAvroSerializer() = KafkaAvroSerializer(schemaRegistryClient())    @Bean    fun kafkaAvroDeserializer() = KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())我做错了什么?请注意,我正在使用融合模式注册表,并尝试从Avro反序列化。
查看完整描述

1 回答

?
炎炎设计

TA贡献1808条经验 获得超4个赞

我相信您错过了为测试设置代理 URL。

文档中有一条关于如何获取此值的说明:

当嵌入式卡夫卡和嵌入式动物园管理员服务器由嵌入式卡夫布鲁克启动时,名为 spring.embedded.kafka.brokers 的系统属性将设置为卡夫卡代理的地址,而名为 spring.embedded.zookeeper.连接的系统属性将设置为动物园管理员的地址。为此属性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS和EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。

(它位于此处的 junit 部分的底部)

解决此问题的一种方法是在测试中将此值设置为此值,例如kafka.consumers.bootstrap-servers

spring:
    kafka:
        consumer:
            bootstrap-servers: ${spring.embedded.kafka.brokers}


查看完整回答
反对 回复 2022-09-21
  • 1 回答
  • 0 关注
  • 53 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信