亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在 Flink 程序中打印 kafka 主題數據?

如何在 Flink 程序中打印 kafka 主題數據?

函數式編程 2021-12-30 19:43:20
我通過這個指令創建了一個主題:C:\kafka_2.12-0.10.2.1>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test < C:\User11\Desktop\Data.csv然后我測試了該主題是否正確使用了該數據。之后想在Flink程序中打印topic,我的程序是這樣的: try{    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    Properties properties = new Properties();    DataStream<String> stream = env            .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(),properties));           stream.print();    env.execute();    } catch (Exception e) {        e.printStackTrace();    }但是我得到了這個信息(因為信息太長我不得不寫一些):[main] INFO org.apache.flink.streaming.api.environment.LocalStreamEnvironment - 在本地嵌入式 Flink mini 集群上運行作業 [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 啟動 Flink Mini Cluster [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 啟動指標注冊表 [main] INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - 沒有配置指標報告器,不會公開/報告指標。[main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 啟動 RPC 服務 [flink-akka.actor.default-dispatcher-2] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger 啟動 [main] INFO org.apache.flink.runtime.minicluster.MiniCluster - 啟動高可用性服務 [main] INFO org.apache.flink.runtime.blob.BlobServer - 創建 BLOB 服務器存儲目錄 C:另外,我也看到了這個鏈接,但它沒有解決我的問題: How to access/read kafka topic data from flink?你能告訴我這里有什么問題嗎?
查看完整描述

1 回答

?
白衣非少年

TA貢獻1155條經驗 獲得超0個贊

問題解決了。首先,我用這個命令填滿了 Kafka 主題:


/home/kafka_2.11-2.0.0/bin/kafka-console-producer.sh --broker-list 10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092 --topic flinkTopic < transactions2.csv

然后,使用此代碼,我可以打印 Kafka 主題:


 final StreamExecutionEnvironment env = 

 StreamExecutionEnvironment.getExecutionEnvironment();

 Properties prop = new Properties();

 prop.setProperty("bootstrap.servers", 

 "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");

 prop.setProperty("group.id", "test");

    FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<> 

  ("flinkTopic", new SimpleStringSchema(),prop);

    myConsumer.setStartFromEarliest();

    DataStream<String> stream = env.addSource(myConsumer);

    stream.print();

    env.execute("Flink Streaming Java API Skeleton");

我希望它對其他人有用。


查看完整回答
反對 回復 2021-12-30
  • 1 回答
  • 0 關注
  • 257 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號