我是這個Apache Kafka主題的新手,我正在編寫一些基本的生產者 - 消費者代碼,并且我遇到了一些消費者代碼的問題,在啟動zookeeper和Kafka之后,我創建了一個主題名稱“firsttopic”,并且我正在使用CLI命令作為生產者輸入一些事件,并且作為消費者檢索這些事件,我已經使用Kafka-go編寫了一個go代碼,我在下面附加了該代碼以及我也面臨的錯誤。對于卡夫卡,我使用的是“github.com/segmentio/kafka-go”。func Startkafka() { conf := kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "firsttopic", GroupID: "g1", MaxBytes: 10, } reader := kafka.NewReader(conf) for { m, err := reader.ReadMessage((context.Background())) if err != nil { fmt.Println("Some error occured", err) continue } fmt.Println("Message is : ", string(m.Value)) }}func main() { go Startkafka() fmt.Println("Kafka has been started...")}錯誤:卡夫卡已啟動...讀取 tcp 127.0.0.1:34858->127.0.1.1:9092 時發生某些錯誤:i/o 超時
3 回答

GCT1015
TA貢獻1827條經驗 獲得超4個贊
根據該錯誤,您應該從映射到本地主機的 /etc/hosts 中刪除 127.0.1.1,因為在客戶端連接到本地主機期間,它應該保持為 127.0.0.1
或者,改為連接到 127.0.0.1:9092

千巷貓影
TA貢獻1829條經驗 獲得超7個贊
在 u main 函數中,在 .然后,它在另一個堆棧中啟動并發運行,而主例程結束執行。StartKafka()go routine
刪除它并運行該應用程序。它將阻止應用程序并從卡夫卡主題消費。go routine
如果您需要為應用程序構建適當的正常關閉過程,則必須使用通道或其他內容。但以下代碼僅用于使用主題內容。
func main() {
fmt.Println("starting kafka...")
Startkafka()
}

撒科打諢
TA貢獻1934條經驗 獲得超2個贊
最大字節數:10 非常低,您應該增加它,因為這可能會導致問題。代碼的其余部分似乎是正確的。您應該首先嘗試使用 Kafka 安裝附帶的 Kafka 使用者進行連接,無論它是否啟動,這也會驗證您正在使用的連接字符串。
- 3 回答
- 0 關注
- 130 瀏覽
添加回答
舉報
0/150
提交
取消