我有一個監聽 kafka 主題的消費者應用程序,在異常情況下,它會按預期將記錄發送到 dlq(死信隊列)。在拋出異常之前,我在消費者應用程序中設置標頭,并且它正在設置,但沒有發送到死信主題,死信主題只有內置標頭,例如“x-exception-message”、“x-”原始分區'等...而不是我設置的那個。以下是我的消費者應用程序中的一段代碼:modifiedMessage = MessageBuilder.fromMessage(consumedMessage).setHeader("x-ecode", new Integer(100)).setHeader(BinderHeaders.PARTITION_OVERRIDE,consumedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)).build();System.out.println("error header:"+modifiedMessage.getHeaders().get("x-ecode",Integer.class)); //100throw new RuntimeException(modifiedMessage.toString());注意:我在 spring.cloud.stream.kafka.binder.header=x-ecode 下的 application.yml 中設置 x-code在上面的代碼中,我能夠設置標頭,并實際驗證它已設置但未發送到死信主題。有效負載已正確發送,我如何將該標頭發送到死信主題?我是否需要在 application.yml 中添加任何屬性才能使其發送?
如何添加額外的自定義標頭,同時拋出異常以將消息存儲在 kafka 上的 dlq 中?
幕布斯7119047
2023-11-10 15:51:19