我正在嘗試在 ksqldb 之上構建一個應用程序。假設我將有一個簡單的生產者:package mainimport ( "fmt" "github.com/rmoff/ksqldb-go" "net/http")var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()func init() { offset := `SET 'auto.offset.reset' = 'earliest';` if err := client.Execute(offset); err != nil { panic(err) } s1 := ` CREATE OR REPLACE STREAM userEvents ( userId VARCHAR KEY, eventType VARCHAR ) WITH ( kafka_topic='user_events', value_format='json', partitions=8 ); ` if err := client.Execute(s1); err != nil { panic(err) }}func main() { http.HandleFunc("/emit", hello) http.ListenAndServe(":4201", nil)}func hello(w http.ResponseWriter, req *http.Request) { userId := req.URL.Query().Get("userId") if userId == "" { http.Error(w, "no userId", 400) return } userEvent := req.URL.Query().Get("event") if userEvent == "" { userEvent = "unknown" } err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');", userId, userEvent)) if err != nil { http.Error(w, err.Error(), 500) return } w.WriteHeader(200) return}此應用程序創建一個數據流并公開一個端點以使用數據填充流。
1 回答

侃侃爾雅
TA貢獻1801條經驗 獲得超16個贊
首先,請注意我不再維護該客戶端,您可能想查看https://github.com/thmeitz/ksqldb-go。
現在回答你的問題。如果我理解正確,您希望出于并行目的運行同一邏輯使用者的多個實例,因此每條消息都應由該邏輯使用者處理一次。
如果是這種情況,那么您正在描述 Kafka 中所謂的消費者組。消費者的多個實例使用相同的客戶端 ID 標識自己,Kafka 確保來自源主題分區的數據被路由到該組中的可用消費者。如果有四個消費者和八個分區,每個消費者將從兩個分區獲取數據。如果一個消費者離開了該組(它崩潰,您縮小規模等),那么 Kafka 會將該消費者的分區重新分配給該組的剩余消費者。
這與您所看到的行為不同,您在其中有效地實例化了多個獨立的消費者。按照設計,Kafka 確保訂閱某個主題的每個消費者都能接收到該主題的所有消息。
我這里特意說的是Kafka,而不是ksqlDB。這是因為 ksqlDB 是建立在 Kafka 之上的,為了理解您所看到的內容,解釋基礎知識很重要。
要獲得您正在尋找的行為,您可能希望直接在您的消費者應用程序中使用消費者 API。您可以在此 Golang 和 Kafka 快速入門中查看消費者 API 的示例。要創建一個消費者組,您需要指定一個唯一的group.id
.
- 1 回答
- 0 關注
- 193 瀏覽
添加回答
舉報
0/150
提交
取消