亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何將RDD拆分為兩個或多個RDD?

如何將RDD拆分為兩個或多個RDD?

如何將RDD拆分為兩個或多個RDD?我正在尋找一種將RDD分割成兩個或多個RDD的方法。我最近看到的是ScalaSPark:將集合拆分成幾個RDD?仍然是一個單一的RDD。如果您熟悉SAS,如下所示:data work.split1, work.split2;     set work.preSplit;     if (condition1)         output work.split1     else if (condition2)         output work.split2 run;這就產生了兩個不同的數據集。必須立即堅持才能得到我想要的結果.。
查看完整描述

3 回答

?
慕沐林林

TA貢獻2016條經驗 獲得超9個贊

不可能從單個轉換*生成多個RDDs。如果要拆分RDD,則必須應用filter對于每個分裂的條件。例如:

def even(x): return x % 2 == 0def odd(x): return not even(x)rdd = sc.parallelize(range(20))rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

如果您只有二進制條件,而且計算成本很高,您可能更喜歡這樣的東西:

kv_rdd = rdd.map(lambda x: (x, odd(x)))kv_rdd.cache()rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

它只意味著一個謂詞計算,但需要對所有數據進行額外的傳遞。

需要注意的是,只要輸入rdd被正確地緩存,并且沒有關于數據分布的附加假設,在重復過濾器和嵌套if-倒換循環之間的時間復雜度方面沒有顯著差異。

對于N個元素和M條件,您必須執行的操作數顯然與N乘以M成正比。如果是for-循環,它應該更接近于(N+MN)/2,重復濾波器正好是NM,但在一天結束時,它只不過是O(NM)。你可以看到我的討論*詹森·倫德曼讀一些正反兩方面的文章。

在非常高的層次上,您應該考慮兩件事:

  1. 星火轉換是懶惰的,直到您執行一個操作,您的rdd才會成為現實。

    這有什么關系?回到我的例子:

    rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

    如果以后我決定我只需要rdd_odd那么就沒有理由去實現rdd_even.

    如果您查看一下要計算的SAS示例work.split2您需要同時實現輸入數據和work.split1.

  2. RDDs提供了一個聲明性API。當你使用filtermap這完全取決于火花引擎如何執行這一操作。只要傳遞給轉換的函數是無副作用的,它就會為優化整個管道創造多種可能性。

到頭來,這起案件還不夠特殊,不足以證明它本身的轉變是合理的。

這個帶有過濾器圖案的地圖實際上是在一個核心星火中使用的。見我對Sparks RDD.隨機Split實際上是如何分割RDD的?和一個相關部分.的.randomSplit方法。

如果唯一的目標是實現輸入的分割,則可以使用partitionBy條款DataFrameWriter哪種文本輸出格式:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

*星火只有三種基本類型的轉換:

  • RDD[T]=>RDD[T]
  • RDD[T]=>RDD[U]
  • (RDD[T],RDD[U])=>RDD[W]

其中T,U,W可以是原子類型,也可以是原子類型產品/元組(K,V)。任何其他操作都必須使用上述的某種組合來表示。你可以檢查原始RDD論文更多細節。

** http:/chat.stackoverflow

*另見ScalaSPark:將集合拆分成幾個RDD?


查看完整回答
反對 回復 2019-07-20
?
白豬掌柜的

TA貢獻1893條經驗 獲得超10個贊

一種方法是使用自定義分區程序根據篩選條件對數據進行分區。這可以通過擴展Partitioner并實現類似于RangePartitioner.

然后,可以使用映射分區從分區RDD構造多個RDD,而無需讀取所有數據。

val filtered = partitioned.mapPartitions { iter => {

  new Iterator[Int](){
    override def hasNext: Boolean = {
      if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
        false
      } else {
        iter.hasNext      }
    }

    override def next():Int = iter.next()
  }

請注意,篩選的RDD中的分區數將與分區RDD中的分區數相同,因此應該使用合并來減少這一點,并刪除空分區。


查看完整回答
反對 回復 2019-07-20
  • 3 回答
  • 0 關注
  • 4309 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號