2 回答

TA貢獻1875條經驗 獲得超3個贊
你可能只需要將你的值設置?auto.offset.reset
為kafka.OffsetBeginning.String()
:
package main
/**
?* Copyright 2016 Confluent Inc.
?*/
// consumer_example implements a consumer using the non-channel Poll() API
// to retrieve messages and events.
import (
? ? "fmt"
? ? "github.com/confluentinc/confluent-kafka-go/kafka"
? ? "os"
? ? "os/signal"
? ? "syscall"
)
func main() {
? ? broker := "YOUR_BROKER"
? ? group := "YOUR_GROUP"
? ? topics := "YOUR_TOPICS"
? ? sigchan := make(chan os.Signal, 1)
? ? signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
? ? c, err := kafka.NewConsumer(&kafka.ConfigMap{
? ? ? ? "bootstrap.servers":? broker,
? ? ? ? "group.id":? ? ? ? ? ?group,
? ? ? ? "session.timeout.ms": 6000,
? ? ? ? "auto.offset.reset":? kafka.OffsetBeginning.String()})
? ? if err != nil {
? ? ? ? fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
? ? ? ? os.Exit(1)
? ? }
? ? fmt.Printf("Created Consumer %v\n", c)
? ? err = c.SubscribeTopics(topics, nil)
? ? run := true
? ? for run == true {
? ? ? ? select {
? ? ? ? case sig := <-sigchan:
? ? ? ? ? ? fmt.Printf("Caught signal %v: terminating\n", sig)
? ? ? ? ? ? run = false
? ? ? ? default:
? ? ? ? ? ? ev := c.Poll(100)
? ? ? ? ? ? if ev == nil {
? ? ? ? ? ? ? ? continue
? ? ? ? ? ? }
? ? ? ? ? ? switch e := ev.(type) {
? ? ? ? ? ? case *kafka.Message:
? ? ? ? ? ? ? ? fmt.Printf("%% Message on %s:\n%s\n",
? ? ? ? ? ? ? ? ? ? e.TopicPartition, string(e.Value))
? ? ? ? ? ? ? ? if e.Headers != nil {
? ? ? ? ? ? ? ? ? ? fmt.Printf("%% Headers: %v\n", e.Headers)
? ? ? ? ? ? ? ? }
? ? ? ? ? ? case kafka.Error:
? ? ? ? ? ? ? ? // Errors should generally be considered as informational, the client will try to automatically recover
? ? ? ? ? ? ? ? fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
? ? ? ? ? ? default:
? ? ? ? ? ? ? ? fmt.Printf("Ignored %v\n", e)
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? fmt.Printf("Closing consumer\n")
? ? c.Close()
}
- 2 回答
- 0 關注
- 195 瀏覽
添加回答
舉報