本人使用spark2.1.0, scala 2.11.0 hadoop 2.7.3。打开IDEA新建一个Maven工程,
去Spark官网查找maven对spark的配置,比如寻找kafka的配置,maven中的使用
Source ArtifactKafka spark-streaming-kafka-0-8_2.11Flume spark-streaming-flume_2.11Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]
最后我们在pom.xml中的配置如下:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kason.spark</groupId>
<artifactId>spark_platform_learn</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.1.0</spark.version>
<scala.version>2.11</scala.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<!-- maven官方 http://repo1.maven.org/maven2/ 或 http://repo2.maven.org/maven2/ (延迟低一些) -->
<repositories>
<repository>
<id>central</id>
<name>Maven Repository Switchboard</name>
<layout>default</layout>
<url>http://repo2.maven.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- MAVEN 编译使用的JDK版本 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build></project>SparkStreaming Demo code:
package com.scala.action.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/**
* Created by kason_zhang on 4/7/2017.
*/object SparkStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("BasicStreamingExample")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("10.64.24.78" , 9999)
val words = lines.flatMap(_.split(" "))
val wc = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
wc.print
wc.saveAsTextFiles("D:\\work\\cloud\\log\\word.txt")
println("pandas: sscstart")
ssc.start()
println("pandas: awaittermination")
ssc.awaitTermination()
println("pandas: done!")
}
}Centos 安装nc(netstat)
yum install nc
启动监听端口Server
nc -lk 9999
防火墙开启9999端口
[kason@kason Desktop]$ suPassword: [root@kason Desktop]# /sbin/iptables -I INPUT -p tcp --dport 9999 -j ACCEPT[root@kason Desktop]# /etc/init.d/iptables saveiptables: Saving firewall rules to /etc/sysconfig/iptables:[ OK ] [root@kason Desktop]# service iptables restartiptables: Setting chains to policy ACCEPT: filter [ OK ]iptables: Flushing firewall rules: [ OK ]iptables: Unloading modules: [ OK ]iptables: Applying firewall rules: [ OK ] [root@kason Desktop]# vi /etc/init.d/iptables [root@kason Desktop]# /etc/init.d/iptables status
之后Linux下执行nc -lk 9999然后就可以在下面输入要发送的数据
[kason@kason Desktop]$ nc -lk 9999hell owoo goo ndate better best goo ndate vi /e hello world hello world out hell o kkl portal hell o kkl
IDEA的输出结果如下:
------------------------------------------- Time: 1491875390000 ms ------------------------------------------- (o,1) (kkl,1) (hell,1)
因此使用IDEA开发一个简单的demo就完成了。
作者:kason_zhang
链接:https://www.jianshu.com/p/fd28dcdf3a1b
點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦