亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Kafka Protobuf:C++ 序列化到 Java

Kafka Protobuf:C++ 序列化到 Java

元芳怎么了 2021-08-06 11:18:33
我開發了幾個 C++ 應用程序,它們生成和使用嵌入 Protobuf3 消息的 Kafka 消息(使用 cppkafka)。兩者都工作正常。生產者的相關代碼是:std::string kafkaString;cppkafka::MessageBuilder *builder;...solidList->SerializeToString(&kafkaString);builder->payload(kafkaString);Protobuf 對象被序列化為字符串并作為 Kafka 負載插入。到目前為止一切正?!,F在,我正在嘗試用 Java 開發一個消費者。相關代碼應該是:KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);....ConsumerRecords<Long, String> records = consumer.poll(100);  for (ConsumerRecord<Long, String> record : records) {    SolidList solidList = SolidList.parseFrom(record.value());    ...但在編譯時失敗: parseFrom 抱怨:類型 Solidlist.SolidList 中的方法 parseFrom(ByteBuffer) 不適用于參數 (String)。所以,我嘗試使用 ByteBuffer:KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);....ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100);  for (ConsumerRecord<Long, ByteBuffer> record : records) {    SolidList solidList = SolidList.parseFrom(record.value());    ...現在,錯誤在執行時間,仍然在 parseFrom(): Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.nio.ByteBuffer。我知道它是一個 java.lang.String ?。?!所以,我回到原來的狀態,并嘗試將其用作字節數組:    SolidList solidList = SolidList.parseFrom(record.value().getBytes());現在,錯誤出現在執行時間:線程“main” com.google.protobuf.InvalidProtocolBufferException$InvalidWireTypeException 中的異常:協議消息標記的線類型無效。.C++ 序列化的 protobuf 文檔說明:bool SerializeToString(string output) const;:序列化消息并將字節存儲在給定的字符串中。請注意,字節是二進制的,而不是文本;我們只使用字符串類作為方便的容器。*TL;DR:因此,我應該如何解釋 Java 中的 protobuf C++“二進制字節”?
查看完整描述

2 回答

?
藍山帝景

TA貢獻1843條經驗 獲得超7個贊

嘗試實現一個反序列化器并將其作為值反序列化器傳遞給KafkaConsumer構造函數。它可能看起來像這樣:


class SolidListDeserializer implements Deserializer<SolidList> {

  public SolidList deserialize(final String topic, byte[] data) {

    return SolidList.parseFrom(data);

  }

  ...

}


...


KafkaConsumer<Long, SolidList> consumer = new KafkaConsumer<>(props, new LongDeserializer(), new SolidListDeserializer())



查看完整回答
反對 回復 2021-08-06
?
DIEA

TA貢獻1820條經驗 獲得超3個贊

您可以將 kafka 讀作ConsumerRecords<Long, String>. 接著SolidList.parseFrom(ByteBuffer.wrap(record.value().getBytes("UTF-8")));


查看完整回答
反對 回復 2021-08-06
  • 2 回答
  • 0 關注
  • 393 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號