亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

當我使用 akka 流在現有消費者組中創建新消費者時,如何尋求結束 kafka 主題?

當我使用 akka 流在現有消費者組中創建新消費者時,如何尋求結束 kafka 主題?

浮云間 2023-02-16 16:10:18
我有一個 akka 應用程序(在 JAVA 中),用于commitablePartitionedSource使用來自 kafka 主題的消息。我有幾個消費者群體,可以為多個主題吸引消費者。這是由動態配置驅動的,我可以在其中暫時關閉消費者,并可能在稍后重新啟動它們。當這個消費者重新啟動時,我只想閱讀新消息,而不是從我離開的地方開始。有沒有辦法從 akka-alpakka 消費者那里獲取 kafkaConsumer 對象,以便我可以在處理之前使用 seekToEnd()?請讓我知道是否還有其他方法可以實現這一目標?也許使用 akka 配置或不同類型的消費者?我不想維護自己的偏移量(希望不是唯一的選擇)我的配置設置為latest在我啟動消費者組時獲取偏移量,但由于我正在關閉并重新啟動單個消費者,它總是從我停止的地方開始消費。我嘗試為一個主題創建一個消費者組,但我有很多主題,而且結果非常耗費資源。我還尋找一種方法來清除存儲在 kafka 中的該主題的偏移量,但沒有成功。
查看完整描述

2 回答

?
叮當貓咪

TA貢獻1776條經驗 獲得超12個贊

最簡單的方法就是在每次啟動重新啟動消費者時創建一個新的消費者組。Kafka 將在可配置的時間量 ( retention.ms ) 后負責刪除陳舊的消費者組。

如果您很少重啟消費者并且總是希望它處理新數據而不是趕上所有丟失的消息,則此策略很好。

編輯

據我所知,訪問底層 KafkaConsumer 的唯一方法是使用committableExternalSource. 這樣您就可以訪問該seekToEnd方法,但是您還需要注意訂閱提供每個分區的起始偏移量的主題(類似于您現在在AkkacommittablePartitionedSource之外設置的方式)。


查看完整回答
反對 回復 2023-02-16
?
明月笑刀無情

TA貢獻1828條經驗 獲得超4個贊

commitablePartitionedSource將AutoSubscription其作為輸入,您不能指定偏移量。


您需要的是一種采用ManualSubscription或更高級別的方法Subscription,例如


> plainExternalSource

> committableExternalSource

> plainSource

...


查看完整回答
反對 回復 2023-02-16
  • 2 回答
  • 0 關注
  • 139 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號