3 回答

TA貢獻1772條經驗 獲得超5個贊
foldApache Spark中的內容與fold未分發的集合中的內容不同。實際上,它需要交換函數才能產生確定性的結果:
這與以Scala之類的功能語言為非分布式集合實現的折疊操作有些不同。該折疊操作可以單獨應用于分區,然后將那些結果折疊為最終結果,而不是以某些定義的順序將折疊應用于每個元素。對于非交換函數,結果可能與應用于非分布式集合的折疊結果不同。
Mishael Rosenthal 已證明了這一點,Make42在其評論中建議了這一點。
有人建議觀察到的行為與HashPartitioner何時parallelize不洗牌和不使用有關HashPartitioner。
import org.apache.spark.sql.SparkSession
/* Note: standalone (non-local) mode */
val master = "spark://...:7077"
val spark = SparkSession.builder.master(master).getOrCreate()
/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })
/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)
解釋:
foldRDD的結構
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
var jobResult: T
val cleanOp: (T, T) => T
val foldPartition = Iterator[T] => T
val mergeResult: (Int, T) => Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
與RDD的結構reduce相同:
def reduce(f: (T, T) => T): T = withScope {
val cleanF: (T, T) => T
val reducePartition: Iterator[T] => Option[T]
var jobResult: Option[T]
val mergeResult = (Int, Option[T]) => Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
在runJob不考慮分區順序的情況下執行,導致需要交換功能。
foldPartition并且reducePartition在處理順序上有效,reduceLeft并且foldLeft在上有效執行(通過繼承和委派)TraversableOnce。
結論:foldRDD不能依賴于塊的順序,而是需要可交換性和關聯性。
- 3 回答
- 0 關注
- 1076 瀏覽
添加回答
舉報