亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用 Spring Kafka 反序列化來自同一 Kafka 主題的不同 JSON 有效負載

使用 Spring Kafka 反序列化來自同一 Kafka 主題的不同 JSON 有效負載

慕娘9325324 2022-05-21 20:31:53
我正在嘗試反序列化來自同一 Kafka 主題的不同 JSON 有效負載。這里提出的其他問題引導我進行了第一次嘗試,但我無法讓它運行。正如 Gary 提到的(這里)有一些提示(JsonSerializer.ADD_TYPE_INFO_HEADERS),但是當我發送和接收兩條消息時,我得到一個異常。org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming messageEndpoint handler details:Method [public void com.foo.message.ConsumerImpl.consumeSelf(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.util.Map<java.lang.String, java.lang.Object>,com.foo.message.KafkaMessage,org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)]Bean [com.foo.message.ConsumerImpl@6df2a206]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.foo.message.KafkaMessageWithAdditionalField] to [com.foo.message.KafkaMessage] for GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers={kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481}], failedMessage=GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers={kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481}]...
查看完整描述

1 回答

?
絕地無雙

TA貢獻1946條經驗 獲得超4個贊

你不能那樣做;您有 2 個不同的偵聽器容器,其中的偵聽器期望不同的對象。


對于接收不同類型的多個監聽器方法,需要@KafkaListener在類級別和@KafkaHandler方法級別使用。


請參閱Class 上的@KafkaListener。


在類級別使用@KafkaListener 時,您在方法級別指定@KafkaHandler。傳遞消息時,轉換后的消息負載類型用于確定調用哪個方法。


@KafkaListener(id = "multi", topics = "myTopic")

static class MultiListenerBean {


    @KafkaHandler

    public void listen(String foo) {

        ...

    }


    @KafkaHandler

    public void listen(Integer bar) {

        ...

    }


    @KafkaHandler(isDefault = true`)

    public void listenDefault(Object object) {

        ...

    }


}

默認方法是可選的,用于未知的有效負載類型。


但這僅適用于智能反序列化器(知道如何轉換為不同的有效負載)。


或者,您可以RecordFilterStrategy向偵聽器容器工廠添加一個以跳過每個偵聽器中的其他記錄。


查看完整回答
反對 回復 2022-05-21
  • 1 回答
  • 0 關注
  • 181 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號