亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何添加額外的自定義標頭,同時拋出異常以將消息存儲在 kafka 上的 dlq 中?

如何添加額外的自定義標頭,同時拋出異常以將消息存儲在 kafka 上的 dlq 中?

幕布斯7119047 2023-11-10 15:51:19
我有一個監聽 kafka 主題的消費者應用程序,在異常情況下,它會按預期將記錄發送到 dlq(死信隊列)。在拋出異常之前,我在消費者應用程序中設置標頭,并且它正在設置,但沒有發送到死信主題,死信主題只有內置標頭,例如“x-exception-message”、“x-”原始分區'等...而不是我設置的那個。以下是我的消費者應用程序中的一段代碼:modifiedMessage = MessageBuilder.fromMessage(consumedMessage).setHeader("x-ecode", new Integer(100)).setHeader(BinderHeaders.PARTITION_OVERRIDE,consumedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)).build();System.out.println("error header:"+modifiedMessage.getHeaders().get("x-ecode",Integer.class)); //100throw new RuntimeException(modifiedMessage.toString());注意:我在 spring.cloud.stream.kafka.binder.header=x-ecode 下的 application.yml 中設置 x-code在上面的代碼中,我能夠設置標頭,并實際驗證它已設置但未發送到死信主題。有效負載已正確發送,我如何將該標頭發送到死信主題?我是否需要在 application.yml 中添加任何屬性才能使其發送?
查看完整描述

1 回答

?
米琪卡哇伊

TA貢獻1998條經驗 獲得超6個贊

您所做的就是創建一條新消息并丟棄它(除非您將異常消息設置為其字符串實現)。

這不會修改原始入站消息。

您只需添加一個新的輸出綁定并自行將修改后的消息發送給它即可。

或者,如果您只添加一個常量,則可以將 a 添加ChannelInterceptor到綁定的錯誤通道并修改那里的消息。如果您需要將狀態傳遞給攔截器,您可以使用自定義異常。

然而,最簡單的解決方案是自己發布消息,而不是使用綁定器的DLQ機制。您已經有了創建消息的邏輯,因此它只是myCustomDlqBinding.send(modifiedMessage)(還添加了標準 DLQ 標頭)。


查看完整回答
反對 回復 2023-11-10
  • 1 回答
  • 0 關注
  • 164 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號