所以我試圖將 Kafka 用于我的應用程序,它有一個生產者將操作記錄到 Kafka MQ 中,而消費者從 MQ 中讀取它。由于我的應用程序在 Go 中,我使用 Shopify Sarama 使這成為可能?,F在,我可以讀取 MQ 并使用fmt.Printf然而,我真的希望錯誤處理比控制臺打印更好,我愿意加倍努力?,F在用于消費者連接的代碼:mqCfg := sarama.NewConfig()master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)if err != nil { panic(err) // Don't want to panic when error occurs, instead handle it}以及消息的處理: go func() { defer wg.Done() for message := range consumer.Messages() { var msgContent Message _ = json.Unmarshal(message.Value, &msgContent) fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it }}()我的問題(我是測試 Kafka 的新手,一般來說也是 kafka 的新手):上面程序中哪里會出現錯誤,以便我可以處理它們?任何示例代碼對我來說都是很好的開始。我能想到的錯誤條件是 msgContent 在 JSON 中并不真正包含任何類型的 ContentId 字段。在 kafka 中,是否存在消費者嘗試以當前偏移量讀取但由于某種原因無法讀取的情況(即使 JSON 格式正確)?我的消費者是否有可能回溯在失敗的偏移讀取上方說 x 步并重新處理偏移?或者有沒有更好的方法來做到這一點?再次,這些情況會是什么?我愿意閱讀和嘗試。
- 1 回答
- 0 關注
- 369 瀏覽
添加回答
舉報
0/150
提交
取消