4 回答

TA貢獻1818條經驗 獲得超3個贊
我稍微調查了一下,發現問題出在 KafkaAvroSerializer/Deserializer 使用的 CashedSchemaRegistryClient 中。它用于從 Confluent Schema Registry 中獲取模式定義。
您已經在本地擁有架構定義,因此您無需為它們轉到架構注冊表。(至少在你的測試中)
我有一個類似的問題,我通過創建自定義 KafkaAvroSerializer/KafkaAvroDeserializer 解決了它。
這是 KafkaAvroSerializer 的示例。這很簡單。您只需要擴展提供的 KafkaAvroSerializer 并告訴他使用 MockSchemaRegistryClient。
public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
? ? public CustomKafkaAvroSerializer() {
? ? ? ? super();
? ? ? ? super.schemaRegistry = new MockSchemaRegistryClient();
? ? }
? ? public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
? ? ? ? super(new MockSchemaRegistryClient());
? ? }
? ? public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
? ? ? ? super(new MockSchemaRegistryClient(), props);
? ? }
}
這是 KafkaAvroDeserializer 的示例。當調用反序列化方法時,您需要告訴他要使用哪個模式。
public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
? ? @Override
? ? public Object deserialize(String topic, byte[] bytes) {
? ? ? ? this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);??
? ? ? ? return super.deserialize(topic, bytes);
? ? }
? ? private static SchemaRegistryClient getMockClient(final Schema schema$) {
? ? ? ? return new MockSchemaRegistryClient() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public synchronized Schema getById(int id) {
? ? ? ? ? ? ? ? return schema$;
? ? ? ? ? ? }
? ? ? ? };
? ? }
}
最后一步是告訴 spring 使用創建的序列化器/反序列化器
spring.kafka.producer.properties.schema.registry.url= not-used
spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id = showcase-producer-id
spring.kafka.consumer.properties.schema.registry.url= not-used
spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id = showcase-consumer-id
spring.kafka.auto.offset.reset = earliest
spring.kafka.producer.auto.register.schemas= true
spring.kafka.properties.specific.avro.reader= true

TA貢獻1780條經驗 獲得超1個贊
如果你在 3 年后看這個例子,你可能想要對 CustomKafkaAvroDeserializer 做一些小的修改
private static SchemaRegistryClient getMockClient(final Schema schema) {
return new MockSchemaRegistryClient() {
@Override
public ParsedSchema getSchemaBySubjectAndId(String subject, int id)
throws IOException, RestClientException {
return new AvroSchema(schema);
}
};
}

TA貢獻1829條經驗 獲得超4個贊
如果您的 @KafkaListener 在測試類中,那么您可以在 StringDeserializer 中讀取它,然后手動將其轉換為所需的類
@Autowired
private MyKafkaAvroDeserializer myKafkaAvroDeserializer;
@KafkaListener( topics = "test")
public void inputData(ConsumerRecord<?, ?> consumerRecord) {
log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());
GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));
Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);
}
@Component
public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {
@Override
public Object deserialize(String topic, byte[] bytes) {
this.schemaRegistry = getMockClient(Myclass.SCHEMA$);
return super.deserialize(topic, bytes);
}
private static SchemaRegistryClient getMockClient(final Schema schema$) {
return new MockSchemaRegistryClient() {
@Override
public synchronized org.apache.avro.Schema getById(int id) {
return schema$;
}
};
}
}
記得在 application.yml 中添加 schema registry 和 key/value serializer 雖然不會用到
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url :http://localhost:8080

TA貢獻1875條經驗 獲得超5個贊
如錯誤所述,您需要在生產者配置中向注冊表提供一個字符串,而不是一個對象。
由于您使用的是 Mock 類,因此該字符串可以是任何東西......
但是,您需要在給定注冊表實例的情況下構造序列化程序
Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);
// make config map with ("schema.registry.url", "unused")
serializer.configure(config, false);
否則,它將嘗試創建一個非模擬客戶端
并將其放入屬性中
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
添加回答
舉報