亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

有沒有辦法從 Kafka 主題獲取最后一條消息?

有沒有辦法從 Kafka 主題獲取最后一條消息?

瀟湘沐 2023-06-28 15:28:26
我有一個具有多個分區的 Kafka 主題,我想知道 Java 中是否有一種方法可以獲取該主題的最后一條消息。我不關心分區,我只想獲取最新消息。我已經嘗試過@KafkaListener,但它僅在主題更新時才獲取消息。如果應用程序打開后沒有發布任何內容,則不會返回任何內容。也許傾聽者根本就不是解決問題的正確方法?
查看完整描述

2 回答

?
牛魔王的故事

TA貢獻1830條經驗 獲得超3個贊

以下片段對我有用。你可以試試這個。評論里有解釋。


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList(topic));


        consumer.poll(Duration.ofSeconds(10));


        consumer.assignment().forEach(System.out::println);


        AtomicLong maxTimestamp = new AtomicLong();

        AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();


        // get the last offsets for each partition

        consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {

            System.out.println("offset: "+offset);


            // seek to the last offset of each partition

            consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);


            // poll to get the last record in each partition

            consumer.poll(Duration.ofSeconds(10)).forEach(record -> {


                // the latest record in the 'topic' is the one with the highest timestamp

                if (record.timestamp() > maxTimestamp.get()) {

                    maxTimestamp.set(record.timestamp());

                    latestRecord.set(record);

                }

            });

        });

        System.out.println(latestRecord.get());


查看完整回答
反對 回復 2023-06-28
?
森林海

TA貢獻2011條經驗 獲得超2個贊

您必須使用每個分區中的最新消息,然后在客戶端進行比較(使用消息上的時間戳,如果包含它)。原因是 Kafka 不保證分區間的順序。在分區內,您可以確定偏移量最大的消息是最新推送到該分區的消息。



查看完整回答
反對 回復 2023-06-28
  • 2 回答
  • 0 關注
  • 367 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號