通过spark-submit提交的任务都需要指定Main类作为程序的入口,Main类执行结束即Spark任务终结。如果需要通过外部程序实时向Spark任务提交数据并获取结果又该如何呢?
思路很简单,让Spark任务的Main方法不终止,外部程序与Spark任务进行通信,交互数据。
通信方式很多,比如Socket,netty或者内置Tomcat,Jetty等,不过考虑编码的快捷,通过Akka是比较不错的选择。
开发分为2部分。1.编写Spark任务,该部分会提交到Spark集群中。2.外部调用代码,该部分模拟客户端代码。2者食用Akka Actor进行通信。
先看Spark任务部分
SparkConfig 定义SparkContext对象
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkConf object SparkConfig { val conf = new SparkConf().setAppName("testSpark") val sc = new SparkContext(conf) }
DataService 作为调用Spark RDD操作的业务类。
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkConfimport com.sam.spark.demo.data.config.SparkConfigimport scala.collection.mutable.ArrayBufferclass DataService { def handler(list: ArrayBuffer[String]) : String = { val array = SparkConfig.sc.parallelize(list).max() array } }
Worker Akka的Actor对象,接收外部入参,调用DataService对象,并返回结果
import akka.actor.Actorimport org.slf4j.LoggerFactoryimport com.sam.spark.demo.data.service.DataServiceimport com.sam.spark.demo.akka.msg.TextMessageimport java.util.UUIDimport scala.collection.mutable.ArrayBufferclass Worker extends Actor { val dataService = new DataService() def receive = { case x: ArrayBuffer[String] => { val tm = new TextMessage() tm.msg = dataService.handler(x) sender ! tm } } }
TextMessage 作为返回的消息对象
class TextMessage extends Serializable { var msg : String = null}
AkkaConfig Actor配置类,创建Worker对象
import akka.actor.ActorSystemimport akka.actor.Propsimport com.typesafe.config.ConfigFactory object AkkaConfig { val system = ActorSystem("ReactiveEnterprise",ConfigFactory.load().getConfig("serverSystem")) val workerRef = system.actorOf(Props[Worker], "worker") }
程序入口类
import com.sam.spark.demo.akka.AkkaConfigimport com.sam.spark.demo.data.config.SparkConfigimport scala.concurrent.duration.Durationimport scala.concurrent.Awaitimport java.util.concurrent.TimeUnit object AppStart { def main(args: Array[String]): Unit = { SparkConfig AkkaConfig } }
pom.xml
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.5</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency></dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>Main类名</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins></build>
Akka配置文件
serverSystem { akka { actor { provider = "akka.remote.RemoteActorRefProvider" default-dispatcher { throughput = 2 } serializers { java = "akka.serialization.JavaSerializer" } serialization-bindings { "需要序列化的消息类名" = java } } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "Akka Remote服务地址" port = Akka Remote端口 } } } }
打包
mvn clean scala:compile package -DskipTests=true
发布到Spark集群
./spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class Main类名 --master spark://Spark Master地址 ./spark.demo-0.0.1-SNAPSHOT.jar
再看客户端实现
本地Actor 获取远程ActorRef并发送消息
import akka.actor.Actorimport akka.actor.ActorSelectionimport com.sam.spark.demo.akka.msg.TextMessageimport scala.collection.mutable.ArrayBufferclass Client extends Actor { var remoteActor : ActorSelection = context.actorSelection("akka.tcp://[email protected]:2555/user/processManagers/worker") override def receive: Receive = { case msg: ArrayBuffer[String] => { remoteActor ! msg } case msg: TextMessage => { println(msg.msg) } } }
本地Main类 模拟向远端Actor发送消息
import akka.actor.ActorSystemimport com.typesafe.config.ConfigFactoryimport akka.actor.Propsimport akka.pattern.Patternsimport scala.concurrent.duration.Durationimport scala.concurrent.Awaitimport akka.util.Timeoutimport java.util.concurrent.TimeUnitimport java.util.UUIDimport scala.collection.mutable.ArrayBuffer object ClientStart { def main(args: Array[String]): Unit = { val serverSystem = ActorSystem("clientSystem", ConfigFactory.load().getConfig("clientSystem")) val clientRef = serverSystem.actorOf(Props[Client], "client") while (true) { var list = new ArrayBuffer[String] for (i <- 1 to 100) { list += UUID.randomUUID().toString() } clientRef ! list Thread.sleep(500) } // val future = Patterns.ask(clientRef, "world", Timeout.apply(10L, TimeUnit.SECONDS)); // val result = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); // println(result) } }
Akka配置文件
clientSystem { akka { actor { provider = "akka.remote.RemoteActorRefProvider" default-dispatcher { throughput = 2 } serializers { java = "akka.serialization.JavaSerializer" } serialization-bindings { "需要序列化的消息类名" = java } } } }
作者:SamHxm
链接:https://www.jianshu.com/p/19cfca528ac7
點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦