-
先把計算和調度管理解耦。
HDFS的主節點可以支持兩個以上。
查看全部 -
分別是:分布式存儲系統,分布式計算框架,集群管理和調度(給程序分配資源)。
Yarm的數據來源和去向都是HDFS.
在Yarm上運行很多的計算框架,例如mapreduce.
HDFS架構分析:
分布式存儲:由HDFS決定數據存儲在哪個從節點上。
支持主從架構:
Map Reduce架構:
map體現在代碼中就是一個類。
reduce就是一個聚合統計程序。
Yarm架構:
總結:數據存儲和資源調度都是分布式的主從結構。
查看全部 -
hadoop里面的分布式計算。
通過程序從數據庫拉取數據的過程非常慢。
mysql存儲在磁盤,磁盤io,即把磁盤數據讀到內存里面,再通過網絡,傳到計算程序里面,這兩個是造成慢的主要原因。主要原因是網絡io。
即發生了移動數據。
所以考慮把計算程序傳輸到數據所在的節點。
即執行本地計算,就只需做一個磁盤io。
分布式計算:
在本地執行本地計算,多臺機器執行,每臺計算局部計算。
全局匯總,此時數據集合的傳輸量比較少,網絡io消耗少。
查看全部 -
分布式存儲,單機的存儲能力有限,運用到多臺機器的存儲能力。
如何設備一個分布式存儲系統。
弊端:如何同時有很多請求同時過來,文件系統的請求會阻塞。
主從架構,你想要操作的數據到底在哪個從節點上,然后客戶端直接操作從節點。
主要流程:
查看全部 -
大數據在linux上運行和操作,安裝部署、排查、基本的命令。
linux里面的一門shell腳本,如何開發 調試? 執行腳本就行了。
javaSE內容,大多數都是java開發,不需要javaweb內容,使用IDEA工具。
數據存儲在mysql數據庫中。
查看全部 -
核心是數據清洗和計算的邏輯。前端用bi實現
查看全部 -
11
查看全部 -
Tomcat
/webapps/web項目/WEB_INF/classes/config數據庫配置文件
main.db.driver= com.mysql.cj.jdbc.Driver
main.db.url = jdbc:mysql://localhost:3306/data?serverTimezone=UTC
main.db.user= root
main.db.password = admin
查看全部 -
Sqoop
mapreduce ←→ mysqlsqoop配置:
1. sqoop-env-template改名sqoop-env.sh
2. SQOOP_HOME
3. mysql驅動jar包,添加到Sqoop的lib目錄下
4. 本地安裝mysql和開放mysql遠程訪問權限(去連接集群和windows sq服務)
USE?mysql; CREATE?USER?'root'@'%'?IDENTIFIED?BY?'密碼'; GRANT?ALL?ON?*.*?TO?'root'@'%'; ALTER?USER?'root'@'%'?IDENTIFIED?WITH?mysql_native_password?BY?'密碼'; FLUSH?PRIVILEGES;
5. Hadoop 3.2 版本需要 common-lang.jar 到lib目錄
6. mysql創建數據庫
CREATE?DATABASE?data?DEFAULT?CHARACTER?SET=?utf8?DEFAULT?COLLATE?=?utf8_general_ci; USE?data; CREATE?TABLE?top10{ ????dt?data, ????uid?varchar(255), ????length?bigint(20) };
7. sqoop將hdfs目錄的數據導出到mysql表中
sqoop?export?\ --connect?jdbc:mysql://windows的ip:3306/data?serverTimezone=UTC?\ --username?root?\ --password?admin?\ --table?top10?\ --export-dir?/res/videoinfojobtop10/20190821?\????#hdfs的路徑 --input-fields-terminated-by?"\t"
查看全部 -
!數據指標統計-直播時長Top
map階段獲取id和時長,reduce后cleanup函數對數據map集合進行排序
public?class?VideoInfoTop10Map?extends?Mapper<LongWritable,?Text,?Text,?LongWritable>{ ????@Override ????protected?void?map(){ ????????//Todo ????} } public?class?VideoInfoTop10Reduce?extends?Reducer<Text,?LongWritable,?Text,?LongWritable>{ ????HashMap<String,?Long>?map?=?new?HashMap<>(); ????@Override ????protected?void?reduce(){ ????????//TODO ????????map.put(k2.toSrting(),lengthsum); ????} ???? ????//reduce結束后執行 ????@Override ????protected?void?cleanup(Context?context){ ????????//配置類中獲取dt日期參數 ????????Configuration?conf?=?context.getConfiguration(); ????????String?dt?=?conf.get("dt"); ????????//排序 ????????Map<String,Long>?sortedMap?=?MapUtils.sortValue(map); ????????Set<Map.Entry<String,Long>>?entries?=?sortedMap.entrySet(); ????????Iterator<Map.Entry<String,?Long>>?it?=?entries.iterator(); ????????int?count=1; ????????while(count<=10?&&?it.hasNext()){ ????????????Map.Entry<String,?Long>?entry?=?it.next(); ????????????String?key?=?entry.getKey(); ????????????Long?value?=?entry.getValue(); ????????????//封裝k3,v3 ????????????Text?k3?=?new?Text(); ????????????k3.set(key); ????????????LongWritable?v3?=?new?LongWritable(); ????????????v3.set(value); ????????????context.write(k3,v3); ????????????count++; ????????} ????} } public?class?VideoInfoTop10Job{ ????public?static?void?main(String[]?args){ ????????//從輸入路徑獲取日期 ????????String[]?fields?=?args[0].split("/"); ????????String?tmpdt=?fields[fields.length?-1]; ????????String?dt?=?DataUtils.transDataFormat(tmpdt); ????????conf.set("dt",dt); ????????//因為context中存放conf信息↑ ????????//Todo ????} }
查看全部 -
!數據指標統計
//對金幣數量,總觀看pv,粉絲數量,視頻開播時長 等指標統計
//自定義數據類型 一個記錄管理四個字段
//主播id為key,map節點<k2,v2>為<Text,自定義Writable> //自定義數據類型 public?class?VideoInfoWritable?implements?Writable{ ????private?long?gold; ????private?long?watchnumpv; ????private?long?follower; ???? ????public?void?set(long?gold,?long?watchnumpv,?long?follower){ ????????this.gold=?gold; ????????this.watchnumpv=?watchnumpv; ????????this.follower=?follower; ????} ???? ????public?long?getGold(){ ????????return?gold; ????} ???? ????@Override ????public?void?readFields(DataInput?dataInput){ ????????this.gold=?dataInput.readLong(); ????????this.watchnumpv=?dataInput.readLong(); ????????this.follower=?dataInput.readLong(); ????} ????//讀寫數據順序保持一致! ????@Override ????public?void?write(DataOutput?dataOutput){ ????????dataOutput.writeLog(gold); ????????dataOutput.writeLog(watchnumpv); ????????dataOutput.writeLog(follower); ????} ???? ????//generate添加 ????//作為v3需要改下字段結構 ????@Override ????public?String?toString(){ ????????return?gold+"\t"+watchnumpv+"\t"+follower; ????} } public?class?VideoInfoMap?extend?Mapper<LongWritable,Text,Text,VideoInfoWritable>{ ????@Override ????protected?void?map(LongWritable?k1,?Text?v1,?Context?context){ ????????String?line?=?v1.toString(); ????????//用之前清洗后的數據 ????????String[]?fields?=?line.split("\t"); ????????String?id?=?fields[0]; ????????long?gold?=?Long.parseLong(fields[1]); ????????long?watchnumpv=?Long.parseLong(fields[2]); ????????long?follower?=?Long.parseLong(fields[3]); ???????? ????????//組裝k2,v2 ????????Text?k2?=?new?Text(); ????????k2.set(id); ????????VideoInfoWritable?v2?=??new?VideoInfoWritable(); ????????v2.set(gold,?watchnumpv,?follower); ????????Context.write(k2,?v2); ????} } public?class?VideoInfoReduce?extends?Reducer<Text,?VideoInfoWritable,?Text,?VideoWritable>{ ????@Override ????protected?void?reduce(Text?k2,?Iterable<VideoInfoWritable>?v2s,?Context?context){ ????????//從v2s把相同key的value取出,?求和 ????????long?goldsum=0; ????????long?watchnumpvsum=0; ????????long?followersum=0; ????????for(?VideoInfoWritable?v2:?v2s){ ????????????goldsum+=?v2.getGold(); ????????????watchnumpvsum?+=?v2.getWatchnumpv(); ????????????followersum?+=?v2.getFollower(); ????????} ????????//組裝?k3,?v3??進行聚合 ????????//Text?k3?=?k2; ????????VideoInfoWritable?v3?=?new?VideoInfoWritable(); ????????v3.set(goldsum,?watchnumpvsum,?followersum); ????????context.write(k3,?v3); ????} } public?class?VideoInfoJob{ ????//執行任務job ????//組裝map?reduce ????public?static?void?main(String[]?args){ ????????try{ ????????????if(args.length!=2){ ???????????? ????????????} ????????????Configuration?conf?=?new?Configuration; ????????????Job?job=?job.getInstance(conf); ????????????job.setJarByClass(VideoInfoJob.class); ????????????//文件輸入輸出 ????????????FileInputFormat ????????????FileOutputFormat ????????????//map ????????????job.setMapperClass ????????????//k2類型 ????????????job.setMapOutputKeyClass ????????????//v2類型 ????????????job.setMapOutpiyValueClass ????????????//reduce ????????????job.setReducerClass ????????????//k3 ????????????job.setReducerClass ????????????// ????????} ????} }
查看全部 -
數據清洗
//json格式數據提取 //需要fastjson對數據解析 //不需要聚合不需要reduce //k1,v1段固定<LongWritable,?Text> //k2,v2類型<Text,?Text>k2主播id,?v2核心字段,用\t分割 public?class?DataCleanMap?extend?Mapper<LongWritable,Text,Text,Text>{ ????@Override ????protected?void?map(LongWritable?k1,?Text?v1,?Context?context){ ????????String?line?=v1.toString(); ????????JSONObject?jsonObj=?JSON.parseObject(line); ????????String?id=?jsonObj.getString("uid"); ????????int?gold=?jsonObj.getString("gold"); ????????int?watchnumpv=?jsonObj.getString("watchnumpv"); ???????? ????????if(gold>=0?&&?watchnumpv?>=?0){ ????????????Text?k2?=?new?Text(); ????????????k2.set(id); ????????????Text?v2?=?new?Text(); ????????????v2.set(gold+?"\t"?+?watchnumpv); ????????????context.write(k2,?v2); ????????} ????} } public?class?DataCleanJob{ }
查看全部 -
文件->分片-> MAP開始-> 分區 -> 排序 -> 分組 -> MAP結束 -> shuffle -> (reduce端)全局排序 -> 分組 -> reduce
分區:不同數據類型分類
分組:相同k2分一組
map階段:
RecordReader類把每個InputSplit解析成<k1,v1>
一個InputSplit對應一個map task
框架對<k2,v2>分區,不同分區由不同reduce task處理,默認一個分區
map節點可以執行reduce歸約,為可選項 (Combine)
shuffle:
多個map任務輸出按照不同分區網絡拷貝不同reduce節點
reduce階段:
全局合并 排序 分組
reduce方法 輸入<k2,{v2...}> 輸出<k3,v3>
查看全部 -
split邏輯運算塊: 一個split對應一個mapper任務
K1, V1: K1是相對文本偏移量,V1代表該行文本
Shuffle:一個線程 將map產生結果拉取到reduce端做匯總
查看全部 -
分布式計算:? 將計算程序發送到本地 避免大數據傳輸
局部聚合 -> 數據傳輸(網絡I/O) ->整體聚合
查看全部
舉報