亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

引起:java.io.NotSerializedException:org.apache.kafka

引起:java.io.NotSerializedException:org.apache.kafka

慕沐林林 2023-08-04 16:34:33
我嘗試使用 kafka 生產者發送 java 字符串消息。字符串消息是從Java Spark JavaPairDStream中提取的。JavaPairDStream<String, String> processedJavaPairStream = inputStream.mapToPair                 (record-> new Tuple2<>(record.key(), record.value())).mapValues(message -> message.replace('>', '#'));String outTopics = "outputTopic";String broker = "localhost:9092";Properties properties = new Properties();properties.put("bootstrap.servers", broker);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<String, String>(properties);processedJavaPairStream.foreachRDD(rdd -> rdd.foreach(tuple2 -> {    ProducerRecord<String, String> message = new ProducerRecord<String, String>(outTopics, tuple2._1, tuple2._2);    System.out.println(message.key() + " : " + message.value()); //(1)    producer.send(message).get(); //(2)}));(1) 行正確打印消息字符串。但是當我使用 kafka 生產者發送這些消息(如(2)行)時,它會拋出如下異常,我無法理解這個異常。我確認 kafaka 生產者消息是<String,String>通過第 (1) 行輸入的。但為什么第(2)行會拋出這個異常呢?我是否錯過任何流程?
查看完整描述

1 回答

?
動漫人物

TA貢獻1815條經驗 獲得超10個贊

您需要為每個 RDD 創建生產者。

RDD 分布在多個執行器上,Producer 對象無法序列化以在它們之間共享


或者,查看結構化流的文檔,您可以簡單地執行此操作以寫入主題;無需自己創建和發送記錄

stream.writeStream().format("kafka")...

請注意,如果目標只是將一個主題映射到另一個主題,那么Kafka Streams API

比 Spark 更簡單且開銷更少


查看完整回答
反對 回復 2023-08-04
  • 1 回答
  • 0 關注
  • 172 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號