亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Apache Spark 可以使用 TCP 偵聽器作為輸入嗎?

Apache Spark 可以使用 TCP 偵聽器作為輸入嗎?

12345678_0001 2023-06-21 16:05:24
Apache Spark 可以使用 TCP 偵聽器作為輸入嗎?如果是,也許有人有執行該操作的 java 代碼示例。我試圖找到關于此的示例,但所有教程都展示了如何通過 TCP 定義到數據服務器的輸入連接,而不是使用等待傳入數據的 TCP 偵聽器。
查看完整描述

2 回答

?
繁星點點滴滴

TA貢獻1803條經驗 獲得超3個贊

是的,可以使用 Spark 監聽 TCP 端口并處理任何傳入數據。您正在尋找的是Spark Streaming。

為了方便:

import org.apache.spark.*;

import org.apache.spark.api.java.function.*;

import org.apache.spark.streaming.*;

import org.apache.spark.streaming.api.java.*;

import scala.Tuple2;


// Create a local StreamingContext with two working thread and batch interval of 1 second

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));


// Create a DStream that will connect to hostname:port, like localhost:9999

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);


// Split each line into words

JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());


// Count each word in each batch

JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));

JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);


// Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.print();


jssc.start();? ? ? ? ? ? ? // Start the computation

jssc.awaitTermination();? ?// Wait for the computation to terminate


查看完整回答
反對 回復 2023-06-21
?
慕姐8265434

TA貢獻1813條經驗 獲得超2個贊

Spark沒有內置的TCP服務器來等待生產者和緩沖數據。Spark 通過其 API 庫在 TCP、Kafka 等的輪詢機制上工作。要使用傳入的 TCP 數據,您需要有一個 Spark 可以連接到的外部 TCP 服務器,如 Shaido 在示例中所解釋的那樣。



查看完整回答
反對 回復 2023-06-21
  • 2 回答
  • 0 關注
  • 172 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號