-
KeyValue對RDDS 創建KeyValue對RDDS: 使用map()函數,返回key/value對 eg: 包含數行數據的RDD,把每一行數據的第一個單詞作為keys; ?val rdd=sc.textFile("/home/1707498/YC_test/words.txt") ?rdd.foreach(println) ? ?var rdd2=rdd.map(line=>(line.spilt(" ")(0),line)) 每一行的第一個座位key,整行數據座位value ? KeyValue對Rdds的Transformation eg:val rdd=sc.parallelize(Array((1,2),(3,4),(3,6))) rdd.reduceByKey(func): 把相同的key結合 eg:rdd.reduceByKey((x,y)=>x+y) ?結果:{1,2),(3,10)} rdd.groupByKeyByKey(func):把相同key的values分組 eg:rdd.groupByKeyByKey((x,y)=>x+y) ?結果:{(1,[2]),(3,[4,6])} rdd.mapValues(func):函數作用于pairRDD的每個元素,key不變 eg:rdd.mapValues(x=>x+1) ?結果:{(1,3),(3,5),(3,7)} rdd.flatMapValues(func):符號化的時候使用; eg:rdd.flatMapValues(x=>x to 5) ?結果:{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} keys():僅返回keys values():僅返回values sortByKey():按照key排序的RDD *重要 combinByKey(createCombiner,mergeValue,mergeCombiners,partitioner):把相同的key結合,使用不同的返回類型 最常用的聚合函數,返回的類型可以與輸入類型不一樣,許多; 遍歷partition中的元素,元素的key,要么之前見過的,要么不是(rdd中很多個分區組成); 如果是新元素key,使用我們提供的createCombiner()函數(相當于初始化) 如果是這個partition中已經存在的key,就會使用mergeValue()函數(相當于整合) 合計每個partition的結果的時候,使用mergeCombiners()函數 eg:求平均值 val scores=sc.parallelize(Array(("jake",80.0),("jake",90.0),("jake",90.0),("mary",89.0),("mary",40.0),("mary",90.0)) scores.foreach(println) val score2=scores.combinByKey(x=>(1,x),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2_1,c1._2+c2._2))) x=>(1,x) ? 想要求平均值,需要知道科目的總和,科目的個數,每遍歷一個新key記1 c1:(Int,Double) ? ?int表示累計科目數,double累計分數 c1._1+1,c1._2+newScore ?c1._1取的第一個值,遇到新的key就加1 ?c1._2+newScore 分數累加 c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2_1,c1._2+c2._2) ?每一個分區中匯總的科目數、分數匯總 ? val score2=scores.combinByKey(x=>(1,x),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2_1,c1._2+c2._2))) val average=score2.map{case(name,(num,score))=>(name,score/num)} case(name,(num,score)) ?判斷傳遞過來的類型是否正確 name,score/num) 正確的話便執行求均值
查看全部 -
errorsRDD和waringsRDD都是inputRDD經過filter操作后生成的新的RDD,這兩個RDD經過union操作后生成新的badLinesRDD,這樣一步一步組成血統關系圖
延遲計算
????Spark對DDS的計算是在第一次使用action操作的時候才使用;
????這種方式在處理大數據的時候特別有用,可以減少數據的傳輸(因為在第一次ation時才使用);
????Spark內部記錄metadata,表名transformations操作已經被響應了
????加載數據也是延遲計算,數據只有在必要的時候,才會被加載進去(只有使用的時候才會被加載進去)
RDD.persist():
????默認每次在RDDS上面進行action操作時,Spark都重新計算RDDS;
????如果想重復利用一個RDDD,可以使用RDD.persist()(如還想使用上述union的BadLinesRDD會從inputRDD開始action一遍,RDD.persist則無需重復上訴過程)
????unpersist()方法從緩存中移除;
????
????
????
查看全部 -
RDDS介紹
SparkContext:代表和集群的連接
????Driver Program 通過SparkContext對象訪問Spark;
????代表和一個集群的連接
????在shell中SparkContext自動創建好了,就是sc
RDDS:
????eg:var lines=sc.textfile("/rrr.txt")
????彈性分布式數據集(把yyy.txt的數據加載到彈性分布式數據集lines中)
????并行分布在整個集群中(如果有500G的數據,分成5塊放在不同的集群中);
????是Spark分發數據和計算的基礎抽象類;
????是一個不可改變的彈性分布式數據集(增刪改后相當于生成了一個新的RDD)
????Spark中所有的計算都是通過RDDs創建、轉換操作完成的;
????????一個RDDS的內部有很多分片組成的(如果有500G的數據,分成5塊放在不同的集群中,每臺100G)
分片:
每個分片包括一部分數據,partions在在集群不同節點上計算;
分片是spark并行處理的單元,Spark是順序的、并行的處理分片
RDDS的創建方法:
把一個存在的集合傳給SparkContext的parallelize()方法;
eg: ?val rdd =sc.parallelize(Array(1,2,2,4),4)
第一個參數:待并行化處理的集合,第二個參數:分區個數
rdd.foreach(print) ?遍歷
rdd.foreach(println) 換行,多次打印順序不一致,因為分成4個分片,先取到那個分片是隨機的;
加載外部數據集(很多方法,也可以加載集群上的數據);
eg: val rddText=sc.textTextFile("yyy.txt")
Scala的變量聲明:
創建變量的時候,必須使用val或者var;
val:變量值不可修改,一旦分配不能重新指向別的值
var:分配后,可以指向類型相同的值;
Scala的匿名解析和類型解析:
eg: lines.filter(line => line.contains("world"))
定義一個匿名函數,接收一個參數line指向包含‘word’的行
使用line這個String類型變量上的contains方法,并且返回結果;
line的類型不需要指定,能夠推斷出來
查看全部 -
20180924:講的非常一般,看了十分鐘,不想繼續了,先放著吧。查看全部
-
Spark組件:
Spark core(其余組件軍繼承了RDD API):
????包含Spark基本功能,任務調度,內存管理,容錯機制等;
????內部定義了RDDs(彈性分布式數據集);
????提供了很多APIs來創建這些RDDs;
????應用場景,為其他組件提供底層服務;
Spark SQl:
????是Spark處理結構化數據的庫,類似hive sql,mysql;
????應用場景:企業中做報表統計
Spark Streaming:
????是實時數據流處理組件,類似Storm;
????Spark Streaming提供了API來操作實時流數據;
????應用場景,企業中用來從kafka接收數據作實時統計
Mlib:
????一個包含通用機器學習功能的包,Machine learnimg lib;
????包含分類,聚類,回歸等,還包括模型評估和數據導入;
????MLIB提供以上的方法都支持集群上的橫向擴展(python 單機,);
????應用場景:機器?學習
Graphx:
????是處理圖的庫(如社交網絡圖),并進行圖的并行計算;
????提供了各種圖的操作,和常用的圖算法(如PangeRank);
????應用場景:圖計算;
Cluster Managers:
????集群管理,Spark自帶一個集群 管理是單獨調度器;
????長健集群關鍵包括hadoop yarn,Apache ?Mesos;
緊密集成的優點:
????Spark底層優化了,基于Spark底層的組件也得到了相應的優化;
????緊密集成,節省和各個組件組合使用時的部署,測試等時間;
????向Spark增加新的組件時,其他組件可立刻享用新組件的功能;
查看全部 -
Spark
快速且通用的集群計算平臺
特點:
快速:
擴充了流行的MR計算模型(如MR是分/時計算,那么Spark就是秒/分及計算)
基于內存的計算(計算過程難免產生中間結果,中中間結果放在內存比硬盤快很多)
通用:
容納了其他分布式系統擁有的功能
包括批處理(如Hadoop),迭代式計算(其他機器學習系統),交互式查詢(如hive)和流處理(如stom)
優點:大量降低了維護成本;
高度開放:
提供了python、java、scala、sql的api和豐富的內置庫;
和其他的大數據工具整合的很好,包括hadoop、kafka等等
查看全部 -
存下學習查看全部
-
存下學習查看全部
-
把數據加載到節點的內存中,使得分布式處理在秒級完成
查看全部 -
1.spark持久化的集中方式
查看全部 -
spark大綱
查看全部 -
創建keyvalue查看全部
-
persist緩存級別補充查看全部
-
persist緩存級別查看全部
-
persist函數,使得可以重復利用rdd查看全部
舉報