为了账号安全,请及时绑定邮箱和手机立即绑定

弹簧靴嵌入式卡夫卡无法连接

弹簧靴嵌入式卡夫卡无法连接

偶然的你 2022-09-21 16:44:33

我正在尝试为我的 Kafka 使用者编写集成测试。我已经遵循了官方参考文档,但是当我开始测试时,我只看到这个重复的广告无限期:


-2019-04-03 15:47:34.002 WARN 13120 --- [主] 组织.apache.kafka.clients.Network客户端 : [消费者客户端 Id=消费者-1,groupId=我的-组] 无法建立与节点 -1 的连接。经纪人可能不可用。


我做错了什么?


我正在使用 JUnit5、弹簧靴和 .spring-kafkaspring-kafka-test


我有我的班级的注释。@EnableKafka@Configuration


我的测试类是这样的:


@ExtendWith(SpringExtension::class)

@SpringBootTest(classes = [TestKafkaConfig::class])

@DirtiesContext

@EmbeddedKafka(

        partitions = 1,

        topics = [KafkaIntegrationTest.MY_TOPIC])

class KafkaIntegrationTest {


    @Autowired

    private lateinit var embeddedKafka: EmbeddedKafkaBroker


    @Test

    fun test() {

        val senderProps = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString)

        val template = KafkaTemplate(DefaultKafkaProducerFactory<Int, String>(senderProps))

        template.defaultTopic = KafkaIntegrationTest.MY_TOPIC

        template.sendDefault("foo")

    }

}

我看起来像这样:application.yml


kafka:

  consumer:

    group-id: my-group

    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}

    value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    properties:

      schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}

      specific.avro.reader: true

我也尝试过设置一个,但我得到完全相同的重复消息。(这就是我尝试设置的方式):MockSchemaRegistryClientMockSchemaRegistryClient


@TestConfiguration

@Import(TestConfig::class)

class TestKafkaConfig {


    @Autowired

    private lateinit var props: KafkaProperties


    @Bean

    fun schemaRegistryClient() = MockSchemaRegistryClient()


    @Bean

    fun kafkaAvroSerializer() = KafkaAvroSerializer(schemaRegistryClient())


    @Bean

    fun kafkaAvroDeserializer() = KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())


我做错了什么?请注意,我正在使用融合模式注册表,并尝试从Avro反序列化。

查看完整描述

1 回答

?
炎炎设计

TA贡献1484条经验 获得超3个赞

我相信您错过了为测试设置代理 URL。

文档中有一条关于如何获取此值的说明:

当嵌入式卡夫卡和嵌入式动物园管理员服务器由嵌入式卡夫布鲁克启动时,名为 spring.embedded.kafka.brokers 的系统属性将设置为卡夫卡代理的地址,而名为 spring.embedded.zookeeper.连接的系统属性将设置为动物园管理员的地址。为此属性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS和EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。

(它位于此处的 junit 部分的底部)

解决此问题的一种方法是在测试中将此值设置为此值,例如kafka.consumers.bootstrap-servers

spring:
    kafka:
        consumer:
            bootstrap-servers: ${spring.embedded.kafka.brokers}


查看完整回答
反对 回复 2022-09-21

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信