亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Sarama Kafka 消費者組函數返回

Sarama Kafka 消費者組函數返回

Go
呼喚遠方 2022-04-26 15:24:29
我對 Go Lang 非常陌生,并試圖對使用 Sarama 庫從 Kafka 消費消息的開源庫進行一些調整。原始代碼可以在這里找到。原始包實現了一個 PartitionConsumer,如果不需要跨多個消費者使用同一主題的讀取一致性,它就可以正常工作,但是,這對我不起作用。我在同一個應用程序中做了一些工作,使用我在網上找到的一些示例來實現 sarama NewConsumerGroup 包。KafkaConfig 為消費者攜帶 groupID 和 Topic。當我運行這個程序時,消費者啟動并使用正確的組從正確的主題中讀取,并使用在此函數中創建的 ConsumerClaim 將其打印到 STDOUT:func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {    for message := range claim.Messages() {        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)        session.MarkMessage(message, "")    }    return nil}然而,我認為我需要的是NewKafkaInput函數返回*KafkaInput添加到結構的聲明中的消息(如果我在這里使用了錯誤的術語,請原諒我,這是我的第一個 Go 牛仔競技表演)。... i := &KafkaInput{        config: config,        // consumers: make([]sarama.PartitionConsumer, len(partitions)),        // messages:  make(chan *sarama.ConsumerMessage, 256),        messages: make(chan *sarama.ConsumerMessage, 256),    }    return i}在此處完成的原始示例中:func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {    ...    go func(consumer sarama.PartitionConsumer) {                defer consumer.Close()                for message := range consumer.Messages() {                    i.messages <- message                }            }(consumer)    ...}我花了好幾天的時間來研究將函數移入和移出NewKafakInput函數,嘗試將消息添加到KafakInput函數外部的結構以及介于兩者之間的所有內容。我只是無法讓它工作。該NewKafakInput函數需要返回*KafkaInput任何消息,以便此函數可以完成:func (i *KafkaInput) Read(data []byte) (int, error) {    message := <-i.messages    if !i.config.useJSON {        copy(data, message.Value)        return len(message.Value), nil    }    var kafkaMessage KafkaMessage    json.Unmarshal(message.Value, &kafkaMessage)    buf, err := kafkaMessage.Dump()    if err != nil {        log.Println("Failed to decode access log entry:", err)        return 0, err    }    copy(data, buf)    return len(buf), nil}完全有可能我也把這件事弄得一團糟,但感謝任何幫助和輸入。
查看完整描述

1 回答

?
素胚勾勒不出你

TA貢獻1827條經驗 獲得超9個贊

這是我的問題的解決方案。我有 goroutines 阻塞了主要功能,他們需要被打破。如果下面的代碼沒有任何意義,這里是我正在修改的程序的鏈接:https ://github.com/buger/goreplay 。如果我能得到所有者的回復,我計劃清理代碼并提交拉取請求,或者可能發布一個分叉。


package main


import (

    "context"

    "encoding/json"

    "strings"


    "os"


    "log"


    "github.com/Shopify/sarama"

)


// KafkaInput is used for recieving Kafka messages and

// transforming them into HTTP payloads.

type KafkaInput struct {

    sarama.ConsumerGroup

    config   *KafkaConfig

    consumer Consumer

    messages chan *sarama.ConsumerMessage

}


// Consumer represents a Sarama consumer group consumer

type Consumer struct {

    ready    chan bool

    messages chan *sarama.ConsumerMessage

}


var (

    brokers  = ""

    version  = ""

    group    = ""

    topics   = ""

    assignor = ""

    oldest   = true

    verbose  = false

)


// NewKafkaInput creates instance of kafka consumer client.

func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {

    /**

     * Construct a new Sarama configuration.

     * The Kafka cluster version has to be defined before the consumer/producer is initialized.

     */

    c := sarama.NewConfig()

    // Configuration options go here


    log.Printf("KafkaConfig: %s", config.host)

    log.Printf("KafkaConfig: %s", config.group)

    log.Printf("KafkaConfig: %s", config.topic)


    log.Println("Starting a new Sarama consumer")


    if verbose {

        sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

    }


    version, err := sarama.ParseKafkaVersion("2.1.1")

    if err != nil {

        log.Panicf("Error parsing Kafka version: %v", err)

    }


    c.Version = version


    if oldest {

        c.Consumer.Offsets.Initial = sarama.OffsetOldest

    }


    group, err := sarama.NewConsumerGroup(strings.Split(config.host, ","), config.group, c)


    /**

     * Setup a new Sarama consumer group

     */

    consumer := Consumer{

        ready:    make(chan bool),

        messages: make(chan *sarama.ConsumerMessage, 256),

    }


    i := &KafkaInput{

        ConsumerGroup: group,

        config:        config,

        messages:      make(chan *sarama.ConsumerMessage, 256),

        consumer:      consumer,

    }


    go i.loop([]string{config.topic})

    i.messages = consumer.messages

    return i

}


//ConsumeClaim and stuff

func (i *KafkaInput) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error {

    for msg := range c.Messages() {

        s.MarkMessage(msg, "")

        i.Push(msg)

    }

    return nil

}


func (i *KafkaInput) loop(topic []string) {

    ctx := context.Background()

    for {

        if err := i.Consume(ctx, []string{i.config.topic}, i); err != nil {

            return

        }

    }

}


// Push Messages

func (i *KafkaInput) Push(m *sarama.ConsumerMessage) {

    if i.consumer.messages != nil {

        log.Printf("MSGPUSH: %s", m)

        i.consumer.messages <- m

    }

}


func (i *KafkaInput) Read(data []byte) (int, error) {


    message := <-i.messages

    log.Printf("Msg: %s", string(message.Value))

    if !i.config.useJSON {

        copy(data, message.Value)

        return len(message.Value), nil

    }


    var kafkaMessage KafkaMessage

    json.Unmarshal(message.Value, &kafkaMessage)


    buf, err := kafkaMessage.Dump()

    if err != nil {

        log.Println("Failed to decode access log entry:", err)

        return 0, err

    }


    copy(data, buf)


    return len(buf), nil


}


func (i *KafkaInput) String() string {

    return "Kafka Input: " + i.config.host + "/" + i.config.topic

}


// Setup is run at the beginning of a new session, before ConsumeClaim

func (i *KafkaInput) Setup(s sarama.ConsumerGroupSession) error {

    return nil

}


// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (i *KafkaInput) Cleanup(s sarama.ConsumerGroupSession) error {

    return nil

}


查看完整回答
反對 回復 2022-04-26
  • 1 回答
  • 0 關注
  • 543 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號