我有一個 SchemaRegistry 和一個 KafkaBroker,我使用 Avro v1.8.1 從中提取數據。對于反序列化,我一直在使用 Confluent 的KafkaAvroDeserializer。現在我打算重構我的代碼以使用 Alpakka 提供的Elasticsearch API,但不幸的是這會破壞反序列化,因為它會導致 NullPointerExceptions:線程“main”中的異常 org.apache.kafka.common.errors.SerializationException:在偏移量 0 處反序列化分區 topic-0 的鍵/值時出錯。如果需要,請尋找過去的記錄以繼續消費。原因:org.apache.kafka.common.errors.SerializationException:反序列化 id 2 的 Avro 消息時出錯 原因:io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)處的 java.lang.NullPointerException io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) 在 io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) 在 org.apache.kafka.Deserialization.Deserializer.common.serializer在 org.apache.kafka.clients.consumer 中反序列化(Deserializer.java:58)。我一直在使用 Alpakka 的 ConsumerSettings API,如本例中所述:val system = ActorSystem.create();// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptionsSpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer()) .withBootstrapServers(kafkaBootstrapServerUrl) .withClientId(InetAddress.getLocalHost().getHostName()) .withGroupId("" + new Random().nextInt()) .withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl) .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .withStopTimeout(Duration.ofSeconds(5));
1 回答

倚天杖
TA貢獻1828條經驗 獲得超3個贊
我認為您需要拉到new KafkaAvroDeserializer()
它自己的變量,然后調用該.configure()
實例上的方法以傳入非空注冊表 URL。
然后將配置的實例傳入ConsumerSettings.create
FWIW,根據您的需要,Kafka Connect 可以很好地加載 Elasticsearch
添加回答
舉報
0/150
提交
取消