我開發了幾個 C++ 應用程序,它們生成和使用嵌入 Protobuf3 消息的 Kafka 消息(使用 cppkafka)。兩者都工作正常。生產者的相關代碼是:std::string kafkaString;cppkafka::MessageBuilder *builder;...solidList->SerializeToString(&kafkaString);builder->payload(kafkaString);Protobuf 對象被序列化為字符串并作為 Kafka 負載插入。到目前為止一切正?!,F在,我正在嘗試用 Java 開發一個消費者。相關代碼應該是:KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);....ConsumerRecords<Long, String> records = consumer.poll(100); for (ConsumerRecord<Long, String> record : records) { SolidList solidList = SolidList.parseFrom(record.value()); ...但在編譯時失敗: parseFrom 抱怨:類型 Solidlist.SolidList 中的方法 parseFrom(ByteBuffer) 不適用于參數 (String)。所以,我嘗試使用 ByteBuffer:KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);....ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100); for (ConsumerRecord<Long, ByteBuffer> record : records) { SolidList solidList = SolidList.parseFrom(record.value()); ...現在,錯誤在執行時間,仍然在 parseFrom(): Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.nio.ByteBuffer。我知道它是一個 java.lang.String ?。?!所以,我回到原來的狀態,并嘗試將其用作字節數組: SolidList solidList = SolidList.parseFrom(record.value().getBytes());現在,錯誤出現在執行時間:線程“main” com.google.protobuf.InvalidProtocolBufferException$InvalidWireTypeException 中的異常:協議消息標記的線類型無效。.C++ 序列化的 protobuf 文檔說明:bool SerializeToString(string output) const;:序列化消息并將字節存儲在給定的字符串中。請注意,字節是二進制的,而不是文本;我們只使用字符串類作為方便的容器。*TL;DR:因此,我應該如何解釋 Java 中的 protobuf C++“二進制字節”?
2 回答
藍山帝景
TA貢獻1843條經驗 獲得超7個贊
嘗試實現一個反序列化器并將其作為值反序列化器傳遞給KafkaConsumer構造函數。它可能看起來像這樣:
class SolidListDeserializer implements Deserializer<SolidList> {
public SolidList deserialize(final String topic, byte[] data) {
return SolidList.parseFrom(data);
}
...
}
...
KafkaConsumer<Long, SolidList> consumer = new KafkaConsumer<>(props, new LongDeserializer(), new SolidListDeserializer())
DIEA
TA貢獻1820條經驗 獲得超3個贊
您可以將 kafka 讀作ConsumerRecords<Long, String>. 接著SolidList.parseFrom(ByteBuffer.wrap(record.value().getBytes("UTF-8")));
添加回答
舉報
0/150
提交
取消
