我有一個 akka 應用程序(在 JAVA 中),用于commitablePartitionedSource使用來自 kafka 主題的消息。我有幾個消費者群體,可以為多個主題吸引消費者。這是由動態配置驅動的,我可以在其中暫時關閉消費者,并可能在稍后重新啟動它們。當這個消費者重新啟動時,我只想閱讀新消息,而不是從我離開的地方開始。有沒有辦法從 akka-alpakka 消費者那里獲取 kafkaConsumer 對象,以便我可以在處理之前使用 seekToEnd()?請讓我知道是否還有其他方法可以實現這一目標?也許使用 akka 配置或不同類型的消費者?我不想維護自己的偏移量(希望不是唯一的選擇)我的配置設置為latest在我啟動消費者組時獲取偏移量,但由于我正在關閉并重新啟動單個消費者,它總是從我停止的地方開始消費。我嘗試為一個主題創建一個消費者組,但我有很多主題,而且結果非常耗費資源。我還尋找一種方法來清除存儲在 kafka 中的該主題的偏移量,但沒有成功。
2 回答

叮當貓咪
TA貢獻1776條經驗 獲得超12個贊
最簡單的方法就是在每次啟動重新啟動消費者時創建一個新的消費者組。Kafka 將在可配置的時間量 ( retention.ms ) 后負責刪除陳舊的消費者組。
如果您很少重啟消費者并且總是希望它處理新數據而不是趕上所有丟失的消息,則此策略很好。
編輯
據我所知,訪問底層 KafkaConsumer 的唯一方法是使用committableExternalSource
. 這樣您就可以訪問該seekToEnd
方法,但是您還需要注意訂閱提供每個分區的起始偏移量的主題(類似于您現在在AkkacommittablePartitionedSource
之外設置的方式)。

明月笑刀無情
TA貢獻1828條經驗 獲得超4個贊
commitablePartitionedSource將AutoSubscription其作為輸入,您不能指定偏移量。
您需要的是一種采用ManualSubscription或更高級別的方法Subscription,例如
> plainExternalSource
> committableExternalSource
> plainSource
...
添加回答
舉報
0/150
提交
取消