亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何從頭訂閱

如何從頭訂閱

Go
ITMISS 2023-06-12 09:33:26
我正在嘗試使用 GroupId 編寫一個 Kafka Consumer foo,它訂閱某個主題并從頭開始讀取(即使之前有偏移量)。我嘗試與重新平衡回調一起使用Subscribe,但它似乎從未被調用(已設置設置go.application)。有什么例子可以使這項工作成功嗎?
查看完整描述

2 回答

?
翻過高山走不出你

TA貢獻1875條經驗 獲得超3個贊

你可能只需要將你的值設置?auto.offset.resetkafka.OffsetBeginning.String()

package main


/**

?* Copyright 2016 Confluent Inc.

?*/


// consumer_example implements a consumer using the non-channel Poll() API

// to retrieve messages and events.


import (

? ? "fmt"

? ? "github.com/confluentinc/confluent-kafka-go/kafka"

? ? "os"

? ? "os/signal"

? ? "syscall"

)


func main() {


? ? broker := "YOUR_BROKER"

? ? group := "YOUR_GROUP"

? ? topics := "YOUR_TOPICS"

? ? sigchan := make(chan os.Signal, 1)

? ? signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)


? ? c, err := kafka.NewConsumer(&kafka.ConfigMap{

? ? ? ? "bootstrap.servers":? broker,

? ? ? ? "group.id":? ? ? ? ? ?group,

? ? ? ? "session.timeout.ms": 6000,

? ? ? ? "auto.offset.reset":? kafka.OffsetBeginning.String()})


? ? if err != nil {

? ? ? ? fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)

? ? ? ? os.Exit(1)

? ? }


? ? fmt.Printf("Created Consumer %v\n", c)


? ? err = c.SubscribeTopics(topics, nil)


? ? run := true


? ? for run == true {

? ? ? ? select {

? ? ? ? case sig := <-sigchan:

? ? ? ? ? ? fmt.Printf("Caught signal %v: terminating\n", sig)

? ? ? ? ? ? run = false

? ? ? ? default:

? ? ? ? ? ? ev := c.Poll(100)

? ? ? ? ? ? if ev == nil {

? ? ? ? ? ? ? ? continue

? ? ? ? ? ? }


? ? ? ? ? ? switch e := ev.(type) {

? ? ? ? ? ? case *kafka.Message:

? ? ? ? ? ? ? ? fmt.Printf("%% Message on %s:\n%s\n",

? ? ? ? ? ? ? ? ? ? e.TopicPartition, string(e.Value))

? ? ? ? ? ? ? ? if e.Headers != nil {

? ? ? ? ? ? ? ? ? ? fmt.Printf("%% Headers: %v\n", e.Headers)

? ? ? ? ? ? ? ? }

? ? ? ? ? ? case kafka.Error:

? ? ? ? ? ? ? ? // Errors should generally be considered as informational, the client will try to automatically recover

? ? ? ? ? ? ? ? fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)

? ? ? ? ? ? default:

? ? ? ? ? ? ? ? fmt.Printf("Ignored %v\n", e)

? ? ? ? ? ? }

? ? ? ? }

? ? }


? ? fmt.Printf("Closing consumer\n")

? ? c.Close()

}


查看完整回答
反對 回復 2023-06-12
?
慕碼人2483693

TA貢獻1860條經驗 獲得超9個贊

我們現在設置enable.auto.commitfalse. 這樣,就不會存儲偏移量,我們每次運行都從頭開始消費。



查看完整回答
反對 回復 2023-06-12
  • 2 回答
  • 0 關注
  • 195 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號