亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用 Avro Schema 注冊表的 Kafka 消費者單元測試失敗

使用 Avro Schema 注冊表的 Kafka 消費者單元測試失敗

紅糖糍粑 2023-06-14 14:00:11
我正在編寫一個消費者,它會收聽 Kafka 主題并在消息可用時使用消息。我通過在本地運行 Kafka 測試了邏輯/代碼,它運行良好。在編寫單元/組件測試用例時,它因 avro 架構注冊表 url 錯誤而失敗。我嘗試了互聯網上可用的不同選項,但找不到任何有效的方法。我不確定我的方法是否正確。請幫忙。監聽類@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")    public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {        try {            GenericRecord generic = consumerRecord.value();            Object obj = generic.get("metadata");            ObjectMapper mapper = new ObjectMapper();            Header headerMetaData = mapper.readValue(obj.toString(), Header.class);            System.out.println("Received payload :   " + consumerRecord.value());            //Call backend with details in GenericRecord         }catch (Exception e){            System.out.println("Exception while reading message from Kafka " + e );        }卡夫卡配置@Bean    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(genericConsumerFactory());        return factory;    }public ConsumerFactory<String, GenericRecord> genericConsumerFactory() {        Map<String, Object> config = new HashMap<>();        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);        config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");        return new DefaultKafkaConsumerFactory<>(config);    }
查看完整描述

4 回答

?
泛舟湖上清波郎朗

TA貢獻1818條經驗 獲得超3個贊

我稍微調查了一下,發現問題出在 KafkaAvroSerializer/Deserializer 使用的 CashedSchemaRegistryClient 中。它用于從 Confluent Schema Registry 中獲取模式定義。


您已經在本地擁有架構定義,因此您無需為它們轉到架構注冊表。(至少在你的測試中)


我有一個類似的問題,我通過創建自定義 KafkaAvroSerializer/KafkaAvroDeserializer 解決了它。


這是 KafkaAvroSerializer 的示例。這很簡單。您只需要擴展提供的 KafkaAvroSerializer 并告訴他使用 MockSchemaRegistryClient。


public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {

? ? public CustomKafkaAvroSerializer() {

? ? ? ? super();

? ? ? ? super.schemaRegistry = new MockSchemaRegistryClient();

? ? }


? ? public CustomKafkaAvroSerializer(SchemaRegistryClient client) {

? ? ? ? super(new MockSchemaRegistryClient());

? ? }


? ? public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {

? ? ? ? super(new MockSchemaRegistryClient(), props);

? ? }

}

這是 KafkaAvroDeserializer 的示例。當調用反序列化方法時,您需要告訴他要使用哪個模式。


public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {

? ? @Override

? ? public Object deserialize(String topic, byte[] bytes) {

? ? ? ? this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);??

? ? ? ? return super.deserialize(topic, bytes);

? ? }


? ? private static SchemaRegistryClient getMockClient(final Schema schema$) {

? ? ? ? return new MockSchemaRegistryClient() {

? ? ? ? ? ? @Override

? ? ? ? ? ? public synchronized Schema getById(int id) {

? ? ? ? ? ? ? ? return schema$;

? ? ? ? ? ? }

? ? ? ? };

? ? }

}

最后一步是告訴 spring 使用創建的序列化器/反序列化器


spring.kafka.producer.properties.schema.registry.url= not-used

spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer

spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.group-id = showcase-producer-id


spring.kafka.consumer.properties.schema.registry.url= not-used

spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer

spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.group-id = showcase-consumer-id

spring.kafka.auto.offset.reset = earliest


spring.kafka.producer.auto.register.schemas= true

spring.kafka.properties.specific.avro.reader= true

查看完整回答
反對 回復 2023-06-14
?
慕神8447489

TA貢獻1780條經驗 獲得超1個贊

如果你在 3 年后看這個例子,你可能想要對 CustomKafkaAvroDeserializer 做一些小的修改


private static SchemaRegistryClient getMockClient(final Schema schema) {

        return new MockSchemaRegistryClient() {


     @Override

     public ParsedSchema getSchemaBySubjectAndId(String subject, int id)

                    throws IOException, RestClientException {

         return new AvroSchema(schema);

     }            

 };

}


查看完整回答
反對 回復 2023-06-14
?
浮云間

TA貢獻1829條經驗 獲得超4個贊

如果您的 @KafkaListener 在測試類中,那么您可以在 StringDeserializer 中讀取它,然后手動將其轉換為所需的類


    @Autowired

    private MyKafkaAvroDeserializer myKafkaAvroDeserializer;


    @KafkaListener( topics = "test")

    public void inputData(ConsumerRecord<?, ?> consumerRecord) {

        log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());


        GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));



        Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);

}

@Component

public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {

    @Override

    public Object deserialize(String topic, byte[] bytes) {


            this.schemaRegistry = getMockClient(Myclass.SCHEMA$);


        return super.deserialize(topic, bytes);

    }




    private static SchemaRegistryClient getMockClient(final Schema schema$) {

        return new MockSchemaRegistryClient() {

            @Override

            public synchronized org.apache.avro.Schema getById(int id) {

                return schema$;

            }

        };

    }

}

記得在 application.yml 中添加 schema registry 和 key/value serializer 雖然不會用到


    consumer:

      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    properties:

      schema.registry.url :http://localhost:8080


查看完整回答
反對 回復 2023-06-14
?
慕田峪4524236

TA貢獻1875條經驗 獲得超5個贊

如錯誤所述,您需要在生產者配置中向注冊表提供一個字符串,而不是一個對象。


由于您使用的是 Mock 類,因此該字符串可以是任何東西......


但是,您需要在給定注冊表實例的情況下構造序列化程序


Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);

 // make config map with ("schema.registry.url", "unused") 

serializer.configure(config, false);

否則,它將嘗試創建一個非模擬客戶端


并將其放入屬性中


producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);


查看完整回答
反對 回復 2023-06-14
  • 4 回答
  • 0 關注
  • 262 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號