为了账号安全,请及时绑定邮箱和手机立即绑定

使用 Avro Schema 注册表的 Kafka 消费者单元测试失败

使用 Avro Schema 注册表的 Kafka 消费者单元测试失败

红糖糍粑 2023-06-14 14:00:11
我正在编写一个消费者,它会收听 Kafka 主题并在消息可用时使用消息。我通过在本地运行 Kafka 测试了逻辑/代码,它运行良好。在编写单元/组件测试用例时,它因 avro 架构注册表 url 错误而失败。我尝试了互联网上可用的不同选项,但找不到任何有效的方法。我不确定我的方法是否正确。请帮忙。监听类@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")    public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {        try {            GenericRecord generic = consumerRecord.value();            Object obj = generic.get("metadata");            ObjectMapper mapper = new ObjectMapper();            Header headerMetaData = mapper.readValue(obj.toString(), Header.class);            System.out.println("Received payload :   " + consumerRecord.value());            //Call backend with details in GenericRecord         }catch (Exception e){            System.out.println("Exception while reading message from Kafka " + e );        }卡夫卡配置@Bean    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(genericConsumerFactory());        return factory;    }public ConsumerFactory<String, GenericRecord> genericConsumerFactory() {        Map<String, Object> config = new HashMap<>();        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);        config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");        return new DefaultKafkaConsumerFactory<>(config);    }
查看完整描述

4 回答

?
泛舟湖上清波郎朗

TA贡献1818条经验 获得超3个赞

我稍微调查了一下,发现问题出在 KafkaAvroSerializer/Deserializer 使用的 CashedSchemaRegistryClient 中。它用于从 Confluent Schema Registry 中获取模式定义。


您已经在本地拥有架构定义,因此您无需为它们转到架构注册表。(至少在你的测试中)


我有一个类似的问题,我通过创建自定义 KafkaAvroSerializer/KafkaAvroDeserializer 解决了它。


这是 KafkaAvroSerializer 的示例。这很简单。您只需要扩展提供的 KafkaAvroSerializer 并告诉他使用 MockSchemaRegistryClient。


public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {

    public CustomKafkaAvroSerializer() {

        super();

        super.schemaRegistry = new MockSchemaRegistryClient();

    }


    public CustomKafkaAvroSerializer(SchemaRegistryClient client) {

        super(new MockSchemaRegistryClient());

    }


    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {

        super(new MockSchemaRegistryClient(), props);

    }

}

这是 KafkaAvroDeserializer 的示例。当调用反序列化方法时,您需要告诉他要使用哪个模式。


public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {

    @Override

    public Object deserialize(String topic, byte[] bytes) {

        this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);  

        return super.deserialize(topic, bytes);

    }


    private static SchemaRegistryClient getMockClient(final Schema schema$) {

        return new MockSchemaRegistryClient() {

            @Override

            public synchronized Schema getById(int id) {

                return schema$;

            }

        };

    }

}

最后一步是告诉 spring 使用创建的序列化器/反序列化器


spring.kafka.producer.properties.schema.registry.url= not-used

spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer

spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.group-id = showcase-producer-id


spring.kafka.consumer.properties.schema.registry.url= not-used

spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer

spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.group-id = showcase-consumer-id

spring.kafka.auto.offset.reset = earliest


spring.kafka.producer.auto.register.schemas= true

spring.kafka.properties.specific.avro.reader= true

查看完整回答
反对 回复 2023-06-14
?
慕神8447489

TA贡献1780条经验 获得超1个赞

如果你在 3 年后看这个例子,你可能想要对 CustomKafkaAvroDeserializer 做一些小的修改


private static SchemaRegistryClient getMockClient(final Schema schema) {

        return new MockSchemaRegistryClient() {


     @Override

     public ParsedSchema getSchemaBySubjectAndId(String subject, int id)

                    throws IOException, RestClientException {

         return new AvroSchema(schema);

     }            

 };

}


查看完整回答
反对 回复 2023-06-14
?
浮云间

TA贡献1829条经验 获得超4个赞

如果您的 @KafkaListener 在测试类中,那么您可以在 StringDeserializer 中读取它,然后手动将其转换为所需的类


    @Autowired

    private MyKafkaAvroDeserializer myKafkaAvroDeserializer;


    @KafkaListener( topics = "test")

    public void inputData(ConsumerRecord<?, ?> consumerRecord) {

        log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());


        GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));



        Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);

}

@Component

public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {

    @Override

    public Object deserialize(String topic, byte[] bytes) {


            this.schemaRegistry = getMockClient(Myclass.SCHEMA$);


        return super.deserialize(topic, bytes);

    }




    private static SchemaRegistryClient getMockClient(final Schema schema$) {

        return new MockSchemaRegistryClient() {

            @Override

            public synchronized org.apache.avro.Schema getById(int id) {

                return schema$;

            }

        };

    }

}

记得在 application.yml 中添加 schema registry 和 key/value serializer 虽然不会用到


    consumer:

      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    properties:

      schema.registry.url :http://localhost:8080


查看完整回答
反对 回复 2023-06-14
?
慕田峪4524236

TA贡献1875条经验 获得超5个赞

如错误所述,您需要在生产者配置中向注册表提供一个字符串,而不是一个对象。


由于您使用的是 Mock 类,因此该字符串可以是任何东西......


但是,您需要在给定注册表实例的情况下构造序列化程序


Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);

 // make config map with ("schema.registry.url", "unused") 

serializer.configure(config, false);

否则,它将尝试创建一个非模拟客户端


并将其放入属性中


producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);


查看完整回答
反对 回复 2023-06-14
  • 4 回答
  • 0 关注
  • 137 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信