我對 Kafka 很陌生,在向生產者推送價值時收到此消息func Produce(topic string, key string, message interface{}) { headers := map[string][]byte{ MSG_HEADER_KEY_CORRELATIONID: []byte("1234"), MSG_HEADER_KEY_REQUESTID: []byte(uuid.NewString()), MSG_HEADER_KEY_TESTID: []byte("456"), MSG_HEADER_KEY_MESSAGETYPE: []byte("TestLookupRequest"), } kheaders := make([]kafka.Header, 0, len(headers)) for k, v := range headers { kheaders = append(kheaders, kafka.Header{Key: k, Value: v}) } var err error servers := "XXXXXX" protocol := "SASL_SSL" mechanisms := "PLAIN" username := "XXXXXXX" password := "XXXXXXX" Producer, err = kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": servers, "security.protocol": protocol, "sasl.username": username, "sasl.password": password, "sasl.mechanism": mechanisms, }) if err != nil { panic(err) } defer Producer.Close() value, _ := json.Marshal(message) err = Producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Key: []byte("12345"), Headers: kheaders, Value: value, Timestamp: time.Now().UTC(), TimestampType: kafka.TimestampCreateTime, }, nil) if err != nil { panic(err) } Producer.Flush(30)}%4|1641074998.615|終止|rdkafka#producer-1| [thrd:app]:生產者以 1 條消息(881 字節)仍在隊列或傳輸中終止:使用 flush() 等待未完成的消息傳遞有關如何解決此問題的任何幫助?
1 回答

滄海一幻覺
TA貢獻1824條經驗 獲得超5個贊
請嘗試更長的超時時間Flush()
;30 毫秒可能還不夠?;蛘邍L試使用本例中的通道: https ://github.com/confluentinc/confluent-kafka-go/blob/80c58f81b6cc32d3ed046609bf660a41a061b23d/examples/producer_example/producer_example.go
- 1 回答
- 0 關注
- 288 瀏覽
添加回答
舉報
0/150
提交
取消