亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

Spark各組件功能簡單理解(quick start)

標簽:
Spark

各个组件

  • conf/spark-env.sh 配置spark的环境变量

  • conf/spark-default.conf 配置spark应用默认的配置项和spark-env.sh有重合之处,可在提交应用时指定要用的配置文件(spark-submit ..... --properties-file FILE 设置应用程序属性的文件放置位置,默认是conf/spark-defaults.conf)

  • bin/spark-shell 一个scala/java的spark REPL工具或者说命令行工具,可用来交互的跑一些脚本,或者测试环境之类

  • bin/spark-submit 部署spark应用

  • sbin/start-all.sh spark-standalone方式启动

  • sbin/start-thriftserver.sh 启动thriftserver,提供了一个jdbc接口查询hive数据库

资源调度

根据spark的部署方式的不同,会有不同的组件执行资源调度(资源主要指cpu,memory)

  • spark on yarn/mesos 由yarn/mesos来做资源调度

  • standalone由spark自己来做资源调度

任务调度

Spark应用提交后经历了一系列的转换,最后成为Task在每个节点上执行。

  1. RDD的Action算子触发Job的提交

  2. 提交到Spark中的Job生成RDD DAG

  3. 由DAGScheduler转化为Stage DAG

  4. 每个Stage中产生相应的Task集合

  5. TaskScheduler将任务分发到Executor执行

  6. 每个任务对应相应的一个数据块,使用用户定义的函数处理数据
    块。

更详细内容 : https://www.2cto.com/net/201712/703266.html

spark 基础操作

  • 主要抽象是RDD(Resilient Distributed Dataset,弹性分布式数据集),它可被分发到集群各个节点上,进行并行操作。RDDs 可以通过 Hadoop InputFormats 创建(如 HDFS),或者从其他 RDDs 转化而来。

  • RDD可以读取hdfs或者local的文件,可以读取text, csv, json, parquet, orc等格式的文件。

  • RDD支持两种类型的操作(算子):

    • action 在数据集上运行计算后返回值

    • transformation转换, 从现有数据集创建一个新的数据集

  • transformation不会立即执行,等到一个action算子触发任务提交

  • spark会根据代码生成DAG图来执行任务的调度

  • rdd结合schema可以创建DataFrame,可以执行一些存表,SQL式的join,select,filter,groupby等操作。

spark-streaming 基础操作

spark的流式处理框架

  • 支持kafka,flume,socket,text等方式创建Stream

  • 生成的对象为DStreamRDD,通过分时生成的RDD来进行操作

  • 支持常用的RDD transformation (map, flatmap,filter,repartition,union,couont,reduce等)

  • 可以通过transfrom算子,将DStream作为rdd进行操作: transform(lambda rdd: func(rdd))

  • 读取kafka消息有两种方式,high api式(带receiver,自动处理offset),low api式(直接操作kafka的offset),spark推荐使用low api式,自己操作offset来保证不冗余的读取,不漏读消息

  • 对于python的spark-streaming,可以借助kazoo包来将offset保存到zookeeper

  • 可以使用checkpoint来进行一定的容错

  • 更详细可以参考:https://blog.csdn.net/weixin_35602748/article/details/78668054

pyspark-streaming demo

from pyspark.sql import SparkSessionfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.kafka import KafkaUtilsdef parse_msg(spark, rdd):
    passif __name__ == "__main__":

    master = "yarn"
    appName = "AppName"
    kafka_params = {        "bootstrap.servers": "master:9092",  # metadata.broker.list or bootstrap.servers
        "auto.offset.reset": "smallest"
    }

    topics = ["logger"]
    spark = SparkSession \
        .builder \
        .appName(appName) \
        .getOrCreate()    # 优先使用sparkSession(spark2.0的方式),这样sparkContext比较容易得到,不用在自定义函数中额外传参
    sc = spark.sparkContext
    ssc = StreamingContext(sc, 10)  # 10s的数据做一次处理

    directKafkaStream = KafkaUtils.createDirectStream(ssc, topics, kafka_params)    # get message strings from kafka
    lines = directKafkaStream.map(lambda x: x[1])    # transform可以把DStream转换为rdd进行操作
    logJsonStrRDD = lines.transform(lambda rdd: parse_msg(spark, rdd))    # pprint必须有,必须有一个输出,否则提示没法注册,报错: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
       # pprint = pretty print 用更好的格式来打印
    logJsonStrRDD.pprint(5)  

    ssc.start()
    ssc.awaitTermination()



作者:祗談風月
链接:https://www.jianshu.com/p/6fb8089a8c1a


點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消