亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用spark DStream的foreachRDD時要注意哪些坑?

使用spark DStream的foreachRDD時要注意哪些坑?

眼眸繁星 2018-09-12 11:05:32
使用spark DStream的foreachRDD時要注意哪些坑
查看完整描述

1 回答

?
長風秋雁

TA貢獻1757條經驗 獲得超7個贊

兩個坑, 性能坑和線程坑
DStream是抽象類,它把連續的數據流拆成很多的小RDD數據塊, 這叫做“微批次”, spark的流式處理, 都是“微批次處理”。 DStream內部實現上有批次處理時間間隔,滑動窗口等機制來保證每個微批次的時間間隔里, 數據流以RDD的形式發送給spark做進一步處理。因此, 在一個為批次的處理時間間隔里, DStream只產生一個RDD。
可以利用dstream.foreachRDD把數據發送給外部系統。 但是想要正確地, 有效率的使用它, 必須理解一下背后的機制。通常向外部系統寫數據需要一個Connection對象(通過它與外部服務器交互)。程序員可能會想當然地在spark上創建一個connection對象, 然后在spark線程里用這個對象來存RDD。比如下面的程序:
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
這個代碼會產生執行錯誤, 因為rdd是分布式存儲的,它是一個數據結構,它是一組指向集群數據的指針, rdd.foreach會在集群里的不同機器上創建spark工作線程, 而connection對象則不會在集群里的各個機器之間傳遞, 所以有些spark工作線程就會產生connection對象沒有被初始化的執行錯誤。 解決的辦法可以是在spark worker里為每一個worker創建一個connection對象, 但是如果你這么做, 程序要為每一條record創建一次connection,顯然效率和性能都非常差。
另一種改進方法是為每個spark分區創建一個connection對象,同時維護一個全局的靜態的連接遲對象, 這樣就可以最好的復用connection。 另外需要注意: 雖然有多個connection對象, 但在同一時間只有一個connection.send(record)執行, 因為在同一個時間里, 只有 一個微批次的RDD產生出來。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
有人問了個問題,為什么foreachRDD里有兩層嵌套的foreach? 為什么dstream.foreachRDD里還要再套一層rdd.foreach
可以這么理解, DStream.foreachRDD 是一個輸出操作符,它返回的不是RDD里的一行數據, 而是輸出DStream后面的RDD,在一個時間間隔里, 只返回一個RDD的“微批次”, 為了訪問這個“微批次”RDD里的數據, 我們還需要在RDD數據對象上做進一步操作.。 參考下面的代碼實例, 更容易理解。
給頂一個 RDD [Security, Prices]數據結構
dstream.foreachRDD { pricesRDD => // Loop over RDD
val x= pricesRDD.count
if (x > 0) // RDD has data
{
for(line <- pricesRDD.collect.toArray) // Look for each record in the RDD
{
var index = line._2.split(',').view(0).toInt // That is the index
var timestamp = line._2.split(',').view(1).toString // This is the timestamp from source
var security = line._2.split(',').view(12.toString // This is the name of the security
var price = line._2.split(',').view(3).toFloat // This is the price of the security
if (price.toFloat > 90.0)
{
// Do something here
// Sent notification, write to HDFS etc
}
}
}
}



查看完整回答
反對 回復 2018-10-08
  • 1 回答
  • 0 關注
  • 1563 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號