1 回答

TA貢獻1827條經驗 獲得超9個贊
這是我的問題的解決方案。我有 goroutines 阻塞了主要功能,他們需要被打破。如果下面的代碼沒有任何意義,這里是我正在修改的程序的鏈接:https ://github.com/buger/goreplay 。如果我能得到所有者的回復,我計劃清理代碼并提交拉取請求,或者可能發布一個分叉。
package main
import (
"context"
"encoding/json"
"strings"
"os"
"log"
"github.com/Shopify/sarama"
)
// KafkaInput is used for recieving Kafka messages and
// transforming them into HTTP payloads.
type KafkaInput struct {
sarama.ConsumerGroup
config *KafkaConfig
consumer Consumer
messages chan *sarama.ConsumerMessage
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
messages chan *sarama.ConsumerMessage
}
var (
brokers = ""
version = ""
group = ""
topics = ""
assignor = ""
oldest = true
verbose = false
)
// NewKafkaInput creates instance of kafka consumer client.
func NewKafkaInput(address string, config *KafkaConfig) *KafkaInput {
/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
c := sarama.NewConfig()
// Configuration options go here
log.Printf("KafkaConfig: %s", config.host)
log.Printf("KafkaConfig: %s", config.group)
log.Printf("KafkaConfig: %s", config.topic)
log.Println("Starting a new Sarama consumer")
if verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
version, err := sarama.ParseKafkaVersion("2.1.1")
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}
c.Version = version
if oldest {
c.Consumer.Offsets.Initial = sarama.OffsetOldest
}
group, err := sarama.NewConsumerGroup(strings.Split(config.host, ","), config.group, c)
/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{
ready: make(chan bool),
messages: make(chan *sarama.ConsumerMessage, 256),
}
i := &KafkaInput{
ConsumerGroup: group,
config: config,
messages: make(chan *sarama.ConsumerMessage, 256),
consumer: consumer,
}
go i.loop([]string{config.topic})
i.messages = consumer.messages
return i
}
//ConsumeClaim and stuff
func (i *KafkaInput) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error {
for msg := range c.Messages() {
s.MarkMessage(msg, "")
i.Push(msg)
}
return nil
}
func (i *KafkaInput) loop(topic []string) {
ctx := context.Background()
for {
if err := i.Consume(ctx, []string{i.config.topic}, i); err != nil {
return
}
}
}
// Push Messages
func (i *KafkaInput) Push(m *sarama.ConsumerMessage) {
if i.consumer.messages != nil {
log.Printf("MSGPUSH: %s", m)
i.consumer.messages <- m
}
}
func (i *KafkaInput) Read(data []byte) (int, error) {
message := <-i.messages
log.Printf("Msg: %s", string(message.Value))
if !i.config.useJSON {
copy(data, message.Value)
return len(message.Value), nil
}
var kafkaMessage KafkaMessage
json.Unmarshal(message.Value, &kafkaMessage)
buf, err := kafkaMessage.Dump()
if err != nil {
log.Println("Failed to decode access log entry:", err)
return 0, err
}
copy(data, buf)
return len(buf), nil
}
func (i *KafkaInput) String() string {
return "Kafka Input: " + i.config.host + "/" + i.config.topic
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (i *KafkaInput) Setup(s sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (i *KafkaInput) Cleanup(s sarama.ConsumerGroupSession) error {
return nil
}
- 1 回答
- 0 關注
- 543 瀏覽
添加回答
舉報