我正在嘗試使用 Golang 客戶端測試生產者將消息寫入 kafka 集群上的主題。package mainimport (? ? "fmt"? ? "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")func main() {? ? p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"localhost"})? ? if err != nil {? ? ? ? panic(err)? ? }? ? defer p.Close()? ? // Delivery report handler for produced messages? ? go func() {? ? ? ? for e := range p.Events() {? ? ? ? ? ? switch ev := e.(type) {? ? ? ? ? ? case *kafka.Message:? ? ? ? ? ? ? ? if ev.TopicPartition.Error != nil {? ? ? ? ? ? ? ? ? ? fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)? ? ? ? ? ? ? ? } else {? ? ? ? ? ? ? ? ? ? fmt.Printf("Delivered message to %v\n", ev.TopicPartition)? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }? ? }()? ? // Produce messages to topic (asynchronously)? ? topic := "test"? ? for _, word := range []string{"test message"} {? ? ? ? p.Produce(&kafka.Message{? ? ? ? ? ? TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},? ? ? ? ? ? Value:? ? ? ? ? []byte(word),? ? ? ? }, nil)? ? }? ? // Wait for message deliveries before shutting down? ? p.Flush(15 * 1000)}我在控制臺上收到消息-消費者沒有問題。然后,我嘗試做同樣的事情,只是使用我的遠程 kafka 集群主題(注意我也嘗試過不使用字符串中的端口):p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"HOSTNAME.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092"})它打印以下錯誤:Delivery failed: test[0]@end(Broker: Not enough in-sync replicas)不過控制臺生產者沒有任何問題:./bin/kafka-console-producer.sh --broker-list HOSTNAME.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092 --topic test>proving that this works控制臺消費者收到它:bin/kafka-console-consumer.sh --bootstrap-server HOSTNAME.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092 --topic test --from-beginning?proving that this works我做的最后一件事是檢查該主題有多少個同步副本。如果我沒看錯的話,最小值應該是 2,而且有 3 個。我還有什么可以研究的想法嗎?
2 回答

莫回無
TA貢獻1865條經驗 獲得超7個贊
您有min.insync.replicas=2
,但該主題只有一個副本。
我相信控制臺制作人只將該屬性設置為 1
有 3 個
其實只有一個。這是代理 ID 3。如果實際上有三個副本,您會看到總共三個單獨的數字作為 ISR
- 2 回答
- 0 關注
- 181 瀏覽
添加回答
舉報
0/150
提交
取消