亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

函數編程中的reduce和foldLeft / fold之間的區別

函數編程中的reduce和foldLeft / fold之間的區別

大話西游666 2019-11-23 12:45:24
為什么Scala和Spark和Scalding等框架同時具有reduce和foldLeft?那么,reduce和之間有什么區別fold?
查看完整描述

3 回答

?
月關寶盒

TA貢獻1772條經驗 獲得超5個贊

foldApache Spark中的內容與fold未分發的集合中的內容不同。實際上,它需要交換函數才能產生確定性的結果:


這與以Scala之類的功能語言為非分布式集合實現的折疊操作有些不同。該折疊操作可以單獨應用于分區,然后將那些結果折疊為最終結果,而不是以某些定義的順序將折疊應用于每個元素。對于非交換函數,結果可能與應用于非分布式集合的折疊結果不同。


Mishael Rosenthal 已證明了這一點,Make42在其評論中建議了這一點。


有人建議觀察到的行為與HashPartitioner何時parallelize不洗牌和不使用有關HashPartitioner。


import org.apache.spark.sql.SparkSession


/* Note: standalone (non-local) mode */

val master = "spark://...:7077"  


val spark = SparkSession.builder.master(master).getOrCreate()


/* Note: deterministic order */

val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])

require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })


/* Note: all posible permutations */

require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

解釋:


foldRDD的結構


def fold(zeroValue: T)(op: (T, T) => T): T = withScope {

  var jobResult: T

  val cleanOp: (T, T) => T

  val foldPartition = Iterator[T] => T

  val mergeResult: (Int, T) => Unit

  sc.runJob(this, foldPartition, mergeResult)

  jobResult

}

與RDD的結構reduce相同:


def reduce(f: (T, T) => T): T = withScope {

  val cleanF: (T, T) => T

  val reducePartition: Iterator[T] => Option[T]

  var jobResult: Option[T]

  val mergeResult =  (Int, Option[T]) => Unit

  sc.runJob(this, reducePartition, mergeResult)

  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))

}

在runJob不考慮分區順序的情況下執行,導致需要交換功能。


foldPartition并且reducePartition在處理順序上有效,reduceLeft并且foldLeft在上有效執行(通過繼承和委派)TraversableOnce。


結論:foldRDD不能依賴于塊的順序,而是需要可交換性和關聯性。


查看完整回答
反對 回復 2019-11-23
  • 3 回答
  • 0 關注
  • 1076 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號