我正在嘗試使用 Java 在 Eclipse 中創建一個 KStream 應用程序。現在我指的是互聯網上可用于 KStreams 的字數統計程序并對其進行修改。我想要的是我從輸入主題讀取的數據應該寫入文件而不是寫入另一個輸出主題。但是當我嘗試將 KStream/KTable 打印到本地文件時,我在輸出文件中得到以下條目:org.apache.kafka.streams.kstream.internals.KStreamImpl@4c203ea1如何實現將 KStream 的輸出重定向到文件?下面是代碼:package KStreamDemo.kafkatest;package org.apache.kafka.streams.examples.wordcount;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.KTable;import org.apache.kafka.streams.kstream.KeyValueMapper;import org.apache.kafka.streams.kstream.Produced;import org.apache.kafka.streams.kstream.ValueMapper;import java.util.Arrays;import java.util.Locale;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class TemperatureDemo {public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "34.73.184.104:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
1 回答

開心每一天1111
TA貢獻1836條經驗 獲得超13個贊
這是不正確的
System.out.println("OUTPUT:"+ counts);
您需要執行counts.foreach
,然后將消息打印到文件中。
將 Kafka 流輸入打印到控制臺?(只需更新以寫入文件)
但是,將流寫入主題可能更好。并使用 Kafka Connect 寫入文件。這是一種更符合行業標準的模式。鼓勵 Kafka Streams 僅在 Kafka 內的主題之間移動數據,而不是與外部系統(或文件系統)集成
使用您想要的主題信息進行編輯connect-file-sink.properties
,然后
bin/connect-standalone config/connect-file-sink.properties
添加回答
舉報
0/150
提交
取消