3 回答
TA貢獻2016條經驗 獲得超9個贊
filter
def even(x): return x % 2 == 0def odd(x): return not even(x)rdd = sc.parallelize(range(20))rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
kv_rdd = rdd.map(lambda x: (x, odd(x)))kv_rdd.cache()rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
星火轉換是懶惰的,直到您執行一個操作,您的rdd才會成為現實。 這有什么關系?回到我的例子: rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果以后我決定我只需要 rdd_odd那么就沒有理由去實現 rdd_even.如果您查看一下要計算的SAS示例 work.split2您需要同時實現輸入數據和 work.split1.RDDs提供了一個聲明性API。當你使用 filter或 map這完全取決于火花引擎如何執行這一操作。只要傳遞給轉換的函數是無副作用的,它就會為優化整個管道創造多種可能性。
randomSplit
partitionByDataFrameWriter
def makePairs(row: T): (String, String) = ???
data
.map(makePairs).toDF("key", "value")
.write.partitionBy($"key").format("text").save(...)RDD[T]=>RDD[T] RDD[T]=>RDD[U] (RDD[T],RDD[U])=>RDD[W]
TA貢獻1893條經驗 獲得超10個贊
PartitionerRangePartitioner.
val filtered = partitioned.mapPartitions { iter => {
new Iterator[Int](){
override def hasNext: Boolean = {
if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
false
} else {
iter.hasNext }
}
override def next():Int = iter.next()
}添加回答
舉報
