亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

利用Akka獲取Spark任務的返回結果

標簽:
Spark

通过spark-submit提交的任务都需要指定Main类作为程序的入口,Main类执行结束即Spark任务终结。如果需要通过外部程序实时向Spark任务提交数据并获取结果又该如何呢?

思路很简单,让Spark任务的Main方法不终止,外部程序与Spark任务进行通信,交互数据。

通信方式很多,比如Socket,netty或者内置Tomcat,Jetty等,不过考虑编码的快捷,通过Akka是比较不错的选择。

开发分为2部分。1.编写Spark任务,该部分会提交到Spark集群中。2.外部调用代码,该部分模拟客户端代码。2者食用Akka Actor进行通信。

先看Spark任务部分

SparkConfig 定义SparkContext对象

import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkConf

object SparkConfig {
  val conf = new SparkConf().setAppName("testSpark")
  val sc = new SparkContext(conf)
}

DataService 作为调用Spark RDD操作的业务类。

import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkConfimport com.sam.spark.demo.data.config.SparkConfigimport scala.collection.mutable.ArrayBufferclass DataService {

  def handler(list: ArrayBuffer[String]) : String = {
    val array = SparkConfig.sc.parallelize(list).max()
    array
  }
}

Worker Akka的Actor对象,接收外部入参,调用DataService对象,并返回结果

import akka.actor.Actorimport org.slf4j.LoggerFactoryimport com.sam.spark.demo.data.service.DataServiceimport com.sam.spark.demo.akka.msg.TextMessageimport java.util.UUIDimport scala.collection.mutable.ArrayBufferclass Worker extends Actor {
  
  val dataService = new DataService()
  
  def receive = {    case x: ArrayBuffer[String] => {
      val tm = new TextMessage()
      tm.msg = dataService.handler(x)
      sender ! tm
    }
  }
}

TextMessage 作为返回的消息对象

class TextMessage extends Serializable {  
  var msg : String = null}

AkkaConfig Actor配置类,创建Worker对象

import akka.actor.ActorSystemimport akka.actor.Propsimport com.typesafe.config.ConfigFactory

object AkkaConfig {

  val system = ActorSystem("ReactiveEnterprise",ConfigFactory.load().getConfig("serverSystem"))

  val workerRef = system.actorOf(Props[Worker], "worker")
}

程序入口类

import com.sam.spark.demo.akka.AkkaConfigimport com.sam.spark.demo.data.config.SparkConfigimport scala.concurrent.duration.Durationimport scala.concurrent.Awaitimport java.util.concurrent.TimeUnit

object AppStart {  def main(args: Array[String]): Unit = {
    SparkConfig
    AkkaConfig
  }
}

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.3</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.5</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency></dependencies>

    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.0</version>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>Main类名</mainClass>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>reference.conf</resource>
                            </transformer>
                        </transformers>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins></build>

Akka配置文件

serverSystem {
    akka {
        actor {
            provider = "akka.remote.RemoteActorRefProvider"
            default-dispatcher {
                throughput = 2
            }
            
            serializers {
                java = "akka.serialization.JavaSerializer"
            }
            
            serialization-bindings {                "需要序列化的消息类名" = java
            }
        }
        remote { 
            enabled-transports = ["akka.remote.netty.tcp"] 
            netty.tcp { 
                hostname = "Akka Remote服务地址" 
                port = Akka Remote端口
            } 
        }
    }
}

打包

mvn clean scala:compile package -DskipTests=true

发布到Spark集群

./spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class Main类名 --master spark://Spark Master地址 ./spark.demo-0.0.1-SNAPSHOT.jar
再看客户端实现

本地Actor 获取远程ActorRef并发送消息

import akka.actor.Actorimport akka.actor.ActorSelectionimport com.sam.spark.demo.akka.msg.TextMessageimport scala.collection.mutable.ArrayBufferclass Client extends Actor {  
  var remoteActor : ActorSelection = context.actorSelection("akka.tcp://[email protected]:2555/user/processManagers/worker")

  override def receive: Receive = {    case msg: ArrayBuffer[String] => {
      remoteActor ! msg
    }    case msg: TextMessage => {
      println(msg.msg)
    }
  }
}

本地Main类 模拟向远端Actor发送消息

import akka.actor.ActorSystemimport com.typesafe.config.ConfigFactoryimport akka.actor.Propsimport akka.pattern.Patternsimport scala.concurrent.duration.Durationimport scala.concurrent.Awaitimport akka.util.Timeoutimport java.util.concurrent.TimeUnitimport java.util.UUIDimport scala.collection.mutable.ArrayBuffer

object ClientStart {
  def main(args: Array[String]): Unit = {

    val serverSystem = ActorSystem("clientSystem", ConfigFactory.load().getConfig("clientSystem"))
    val clientRef = serverSystem.actorOf(Props[Client], "client")    

    while (true) {      var list = new ArrayBuffer[String]      for (i <- 1 to 100) {
        list += UUID.randomUUID().toString()
      }
      clientRef ! list
      Thread.sleep(500)
    }    //    val future = Patterns.ask(clientRef, "world", Timeout.apply(10L, TimeUnit.SECONDS));
    //    val result = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
    //    println(result)
  }
}

Akka配置文件

clientSystem {
    akka {
        actor {
            provider = "akka.remote.RemoteActorRefProvider"
            default-dispatcher {
                throughput = 2
            }
            
            serializers {
                java = "akka.serialization.JavaSerializer"
            }
            
            serialization-bindings {                "需要序列化的消息类名" = java
            }   
        }
    }
}



作者:SamHxm
链接:https://www.jianshu.com/p/19cfca528ac7


點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消