我嘗試使用 kafka 生產者發送 java 字符串消息。字符串消息是從Java Spark JavaPairDStream中提取的。JavaPairDStream<String, String> processedJavaPairStream = inputStream.mapToPair (record-> new Tuple2<>(record.key(), record.value())).mapValues(message -> message.replace('>', '#'));String outTopics = "outputTopic";String broker = "localhost:9092";Properties properties = new Properties();properties.put("bootstrap.servers", broker);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<String, String>(properties);processedJavaPairStream.foreachRDD(rdd -> rdd.foreach(tuple2 -> { ProducerRecord<String, String> message = new ProducerRecord<String, String>(outTopics, tuple2._1, tuple2._2); System.out.println(message.key() + " : " + message.value()); //(1) producer.send(message).get(); //(2)}));(1) 行正確打印消息字符串。但是當我使用 kafka 生產者發送這些消息(如(2)行)時,它會拋出如下異常,我無法理解這個異常。我確認 kafaka 生產者消息是<String,String>通過第 (1) 行輸入的。但為什么第(2)行會拋出這個異常呢?我是否錯過任何流程?
1 回答

動漫人物
TA貢獻1815條經驗 獲得超10個贊
您需要為每個 RDD 創建生產者。
RDD 分布在多個執行器上,Producer 對象無法序列化以在它們之間共享
或者,查看結構化流的文檔,您可以簡單地執行此操作以寫入主題;無需自己創建和發送記錄
stream.writeStream().format("kafka")...
請注意,如果目標只是將一個主題映射到另一個主題,那么Kafka Streams API
比 Spark 更簡單且開銷更少
添加回答
舉報
0/150
提交
取消