亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何獲取時間戳指定的kafka偏移數據

如何獲取時間戳指定的kafka偏移數據

慕桂英4014372 2023-02-16 16:40:42
當我嘗試運行時拋出空指針錯誤時,我試圖根據時間戳獲取 Kafka 主題的偏移量,Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();              for (TopicPartition partition : partitions) {                timestampsToSearch.put(partition,  startTimestamp);              }Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);              for (TopicPartition partition : partitions) {                Long seekOffset = outOffsets.get(partition).offset();consumer.seek(partition, seekOffset);任何幫助將不勝感激。
查看完整描述

2 回答

?
楊__羊羊

TA貢獻1943條經驗 獲得超7個贊

要找到與時間戳對應的偏移量,您需要使用 方法offsetsForTimes()。

例如,這將打印mytopic對應于 1 秒前的分區 0 的偏移量:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    Map<TopicPartition, Long> timestamps = new HashMap<>();
    timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000);
    Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
    System.err.println(offsets);
}

這將顯示如下內容:

{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}


查看完整回答
反對 回復 2023-02-16
?
白豬掌柜的

TA貢獻1893條經驗 獲得超10個贊

您可以Admin.listOffsets使用OffsetSpec.forTimestamp

Map<TopicPartition, OffsetSpec> topicOffsetSpecs = new HashMap<>();

TopicPartition topicPartition = new TopicPartition("topic1", 0);

OffsetSpec offsetSpec = OffsetSpec.forTimestamp(timestamp);

topicOffsetSpecs.put(topicPartition, offsetSpec);

admin.listOffsets(topicOffsetSpecs).all().get(); // Info for given timestamp


查看完整回答
反對 回復 2023-02-16
  • 2 回答
  • 0 關注
  • 496 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號