亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

以云事件格式將數據發送到 Kafka 主題

以云事件格式將數據發送到 Kafka 主題

Go
米脂 2022-10-04 16:11:21
現在我有這個代碼,它工作正常。(它向卡夫卡主題發送一些 json 格式的數據)j, err := json.Marshal(data)if err != nil {    log.Fatal(err)}msg := &sarama.ProducerMessage{    Topic: tName,    Value: sarama.StringEncoder(j),}_, _, err = producer.SendMessage(msg)但有人希望以云事件格式擁有這些數據。->https://github.com/cloudevents/sdk-go 所以我該怎么辦,因為這個事件結構不能直接轉換為字符串。type Event struct {    Context     EventContext    DataEncoded []byte    // DataBase64 indicates if the event, when serialized, represents    // the data field using the base64 encoding.    // In v0.3, this field is superseded by DataContentEncoding    DataBase64  bool    FieldErrors map[string]error}所以這段代碼甚至不會編譯。j, err := json.Marshal(data)if err != nil {    log.Fatal(err)}//...event := cloudevents.NewEvent()event.SetSource("example/uri") event.SetType("example.type")event.SetData(cloudevents.ApplicationJSON, j)producerMsg := &sarama.ProducerMessage{    Topic: s.outputTopic,    Value: sarama.StringEncoder(event),}_, _, err = s.producer.SendMessage(producerMsg)我應該怎么做才能將此活動發送給卡夫卡?嘗試投射事件。數據編碼為字符串或類似的東西?順便說一句。編程語言是戈蘭語。
查看完整描述

1 回答

?
揚帆大魚

TA貢獻1799條經驗 獲得超9個贊

您是否看到序列化事件的文檔部分?


https://github.com/cloudevents/sdk-go#serializedeserialize-a-cloudevent


event := cloudevents.NewEvent()

event.SetSource("example/uri") 

event.SetType("example.type") 

// data here is a map[string] interface{}, or some other Struct type representing the "example.type" schema type above 

event.SetData(cloudevents.ApplicationJSON, data)


bytes, err := json.Marshal(event)

if err != nil {

  log.Fatal(err)

}

producerMsg := &sarama.ProducerMessage{

    Topic: s.outputTopic,

    Value: bytes,  // you've already encoded the event 

}

否則,請務必查看提供的使用 CloudEvent 客戶端的示例代碼 https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/sender/main.go


sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")

if err != nil {

    log.Fatalf("failed to create protocol: %s", err.Error())

}


defer sender.Close(context.Background())


c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())

if err != nil {

    log.Fatalf("failed to create client, %v", err)

}


event := cloudevents.NewEvent() 

event.Set... 


c.Send(..., event)


... 


查看完整回答
反對 回復 2022-10-04
  • 1 回答
  • 0 關注
  • 74 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號