-
RDDS查看全部
-
RDDs之SparkContext sc查看全部
-
RDD之Driver program查看全部
-
Spark
Spark簡介
Spark是什么:
Spark是一個快速且通用的集群計算平臺
Spark的特點
Spark是快速的
Spark擴充了流行的MapReduce計算模型
Spark是基于內存的計算?
Spark是通用的
Spark的設計容納了其他分布式系統擁有的功能,批處理,迭代式計算,交互查詢和流處理等。
優點:降低了維護成本
Spark是高度開放的
Spark提供了Python, Java, Scala, SQL的API和豐富的內置庫。
Spark和其他的大數據工具整合的很好,包括Hadoop, Kafka等
Spark歷史
?誕生于2009年,加州大學伯克利分校RAD實驗室的一個研究項目,最初是基于Hadoop MapReduce
發現MapReduce在迭代式計算和交互式上抵消,引入內存存儲
2010年3月份Spark開源
2011年AMP實驗室在Spark上開發高級組件,像Spark Streaming
2013年轉移到了Apache下,不久便成為頂級項目了。
Spark組件:Spark包含多個緊密集成的組件
Spark Core:?
包含Spark的基本功能,包含任務調度,內存管理,容錯機制等。
內部定義了RDDs(彈性分布式數據集)
提供了很多API來創建和操作這些RDDs。
應用場景,為其他組件提供底層的服務。
Spark SQL:
是Spark處理結構化數據的庫,就像Hive SQL,MySQL一樣。
應用場景,企業中用來做報表統計。
Spark Streaming:
是實施數據流處理的組件,類似Storm。
Spark Streaming提供了API來操作實施流數據。
應用場景,企業中用來從Kafka接受數據做實時統計。
Mlib:
一個包含通用機器學習功能的包,Machine learning lib.
包含分類,聚類,回歸等,還包括模型評估,和數據導入。
Milb提供的上面這些方法,都支持集群上的橫向擴展。
應用場景,機器學習。
Graphx:
是處理圖的庫(例如,社交網絡圖),并進行圖的并行計算。
像Spark Streaming, Spark SQL一樣, 它也集成了RDD API。
它提供了各種圖的操作,和常用的圖算法,例如PangeRank算法。
應用場景,圖計算
Cluster Managers:
就是集群管理,Spark自帶一個集群管理是單獨調度器。
常見集群管理包括Hadoop YARN, Apache Mesos。
緊密集成的優點:
Spark底層優化了,基于Spark底層的組件,也得到了相應的優化。
緊密集成,節省了各個組件組合使用時的部署,測試等時間。
向Spark增加新的組件時,其他組件,可立刻享用新組件的功能。
Spark與Hadoop的比較
Hadoop應用場景
離線處理
對時效性要求不高
Spark應用場景
時效性要求高的場景
機器學習等領域
比較
Doug Cutting(Hadoop之父)的觀點
這是生態系統,每個組件都有其作用,各善其職即可。
Spark不具有HDFS的存儲能力,要借助HDFS等持久化數據。
大數據將會孕育出更多的新技術。
Spark安裝
Spark運行環境
Spark 是scala寫的,運行在JVM上,所以運行環境Java7+
如果使用Python API,需要安裝Python 2.6+或者Python3.4+。
Spark 1.6.2 -Scala 2.10? Spark 2.0.0 -Scala 2.11
Spark下載:
下載地址:http://spark.apache.org/downloads.html
搭Spark不需要Hadoop,如有hadoop集群,可下載相應的版本解壓。
Spark目錄
bin:包含用來和Spark交互的可執行未見,如Spark shell。
core, streaming, python, ...包含主要組件的源代碼。
examples包含一些單機的Spark jobs,可以單機運行的例子。
Spark的Shell
Spark的shell使你能夠處理分布在集群上的數據。
Spark把數據加載到節點的內存中,因此分布式處理可在秒級完成。
快速使迭代式計算,實時查詢,分析一般能夠在shell中完成。
Spark提供了Python shells和Scala shells。
Python Shell:
bin/pyspark
Scala Shell
bin/spark-shell
```
var lines = sc.textFile("../testfile/helloSpark")
lines.conut()
lines.first()
修改日志級別log4j.rootCategory=WARN, console
```
Spark開發環境的搭建
Scala安裝:
IDEA下載
插件安裝:Scala
搭建開發環境常遇到的問題
網絡問題,導致sbt插件下載失敗,解決方法,找一個好的網絡環境。
版本匹配問題:Scala2.10.5, jdk1.8
IntelliJ IDEA 常用設置
開發第一個Spark程序
配置ssh無密登陸:
ssh-keygen
.ssh目錄下cat xxx_rsa.pub > authorized_keys
chmod 600 authorized_keys?
WordCount:
創建一個Spark Context
加載數據
把每一行分割成單詞
轉換成pairs并且計數
RDDs介紹
Driver program:
包含程序的main()方法,RDDs的定義和操作。
它管理很多節點,我們稱作executors。
SparkContext:
Driver programs通過SparkContext對象訪問Spark。
SparkContext對象代表和一個集群的連接。
在Shell中SparkContext自動創建好了,就是sc。
RDDs:
Resilient distributed datasets(彈性分布式數據集,簡稱RDDs)
這些RDDs,并行的分布在整個集群中。
RDDs是Spark分發數據和計算的基礎抽象類。
一個RDD是一個不可改變的分布式集合對象。
Spark中,所有的計算都是通過RDDs的創建,轉換,操作完成的。
一個RDD內部有很多partition(分片)組成的。
分片:
每個分片包括一部分數據,partitions可在集群不同節點上計算。
分片是Spark并行處理的單元,Spark順序的,并行的處理分片。
RDDs的創建方法:
把一個存在的集合傳給SparkContext的parallelize()方法,測試用
val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
第1個參數:待并行化處理的集合
第2個參數:分區個數
加載外部數據集
val rddText = sc.textFile("helloSpark.txt")
Scala的基礎知識
Scala的變量聲明
創建變量是val/var
val:變量值是不可修改的,類似java final。
var:變量值定義完是可以修改的。
Scala的匿名函數和類型推斷
lines.filter(line => line.contains("world")),定義一個匿名函數,接受line.
使用line這個Strig類型的變量上的contains方法,并且返回結果
line不需要制定類型,會自動推斷
RDD基本操作 Transformation
Transformations介紹:
轉換
從之前的RDD構建一個新的RDD,像map() 和 filter()。
逐元素Transformation
map(): map()接收函數,把函數應用到RDD的每一個元素,返回新的RDD。
filter(): filter()接受函數,返回只包含滿足filter()函數的元素的新RDD。
flatMap():?flatMap()對每個輸入元素,輸出多個輸出元素。flat壓扁的意思,將RDD中的元素壓扁后返回一個新的RDD。
集合運算
RDDs支持數學集合的計算,例如并集,交集計算。
RDD基本操作 Action
Action介紹:在RDD上計算出來一個結果。把結果返回給driver program或保存在文件系統,count(), save()
collect(): 返回RDD的所有元素。
count(): 計數。
countByValue(): 返回一個map表示唯一元素出現的個數。
take(num): 返回幾個元素。
top(num): 返回前幾個元素。
takeOrdered(num)(ordering): 返回基于提供的排序算法的前幾個元素。
takeSample(withReplacement, num, [seed]): 取樣例。
reduce(fun): 合并RDD中元素。
fold(zero)(fun): 與reduce()相似提供zero value。
aggregate(zeroValue)(seqOp, combOp): 與fold()相似,返回不同類型。
foreach(fun): 對RDD的每個元素作用函數,什么也不返回
RDDs的特性
RDDs的血統關系圖:
Spark維護著RDDs之間的依賴關系和創建關系,叫做血統關系圖
Spark使用血統關系圖來計算每個RDD的需求和恢復丟失的數據
延遲計算(Lazy Evaluation):
Spark對RDDs的計算是,他們第一次使用action操作的時候。
這種方式在處理大數據的時候特別有用,可以減少數據的傳輸。
Spark內部記錄metadata表明transformations操作已經被響應了。
加載數據也是延遲計算,數據只有在必要的時候,才會被加載進去。
RDD.persist(): 持久化
默認每次在RDDs上面進行action操作時,Spark都重新計算RDDs,如果像重復利用一個RDD,可以使用RDD.persist()。
unpersist()方法從緩存中移除。
例子RDD.persist()
KeyValue對RDDs
創建KeyValue對RDDs
使用map()函數,返回key/value對,例如,包含數行數據的RDD,把每行數據的第一個單詞作為keys
KeyValue對RDDs的Transformations
reduceByKey(fun): 把相同Key的結合。
groupByKey(): 把相同的key的values分組。
combineByKey(): 把相同key的結合,使用不同的返回類型。
mapValues(fun): 函數作用于pairRDD的每個元素,key不變
flatMapValues(fun): 符號化的時候使用
keys: 僅返回keys
values: 僅返回values
sortByKey(): 按照key排序的RDD
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
最常用的基于key的聚合函數,返回的類型可以與輸入類型不一樣,許多基于key的聚合函數都用到了它,像groupByKey()
原理,便利partition中的元素,元素的key,要么之前見過的,要么不是。如果是新元素,使用我們提供的createCombiner()函數,如果是這個partition中已經存在的key,就會使用mergeValue()函數,合計每個partition的結果的時候,使用mergeCombiners()函數
Spark基礎課程總結
Spark介紹。
Spark安裝,開發環境搭建,WordConut程序開發和運行。
RDDs介紹,Transformations,Actions
RDDs的特性,KeyValue對RDDs
后續課程:
Spark架構
Spark運行過程
Spark程序部署
查看全部 -
reduce查看全部
-
Prest查看全部
-
開發第一個Spark程序放到集群上運行
查看全部 -
scala基礎知識
變量聲明:
創建變量必須使用val /var
????區別:val:變量值不可修改,一旦分配不能重新指向別的值
? ? ? ? ? ? ? var:分配后,可以指向類型相同的值
查看全部 -
rdds創建方法
查看全部 -
分片:
每個分片包含一部分數據,partitions可在集群不同節點上計算。
分片是spark并行處理的單元,spark順序的、并行的處理分片
查看全部 -
一個RDD是一個不可改變的分布式集合對象。
Spark中,所有的計算都是通過RDDs的創建、轉換和操作完成的。
一個RDD內部由很多partitions(分片)組成
查看全部 -
RDDs介紹:
RDDs: Resilient distributed datasets(彈性分布式數據集,簡稱RDDs)
RDDs是Spark分發數據和計算的基礎抽象類
查看全部 -
spark context:
Driver programs 通過sparkContext對象訪問spark
SparkContext對象代表和一個集群的連接
在shell中sparkContext自動創建好了,就是sc
查看全部 -
Driver program:
包含程序的main()方法,RDDs的定義和操作。
它管理很多節點,我們稱作executors
查看全部 -
環境搭建,spark和Scala版本匹配
查看全部
舉報