1 回答

TA貢獻1815條經驗 獲得超6個贊
我不確切知道該方法如何binaryFiles跨 Spark 分區對文件進行分區。似乎與此相反textFiles,它傾向于只創建一個分區。讓我們看看一個名為 的示例目錄dir,其中包含 5 個文件。
> ls dir
test1 test2 test3 test4 test5
如果我使用textFile,事情就會并行運行。我不提供輸出,因為它不是很漂亮,但你可以自己檢查。我們可以驗證事物是否與 并行運行getNumPartitions。
>>> sc.textFile("dir").foreach(lambda x: some_function(x, None))
# ugly output, but everything starts at the same time,
# except maybe the last one since you have 4 cores.
>>> sc.textFile("dir").getNumPartitions()
5
由于binaryFiles情況不同,并且由于某種原因,所有內容都進入同一個分區。
>>> sc.binaryFiles("dir").getNumPartitions()
1
我什至嘗試使用 10k 個文件,所有內容仍然位于同一分區。我相信這背后的原因是,在scala中,binaryFiles返回一個帶有文件名的RDD和一個允許讀取文件的對象(但不執行讀?。?。因此速度很快,并且生成的 RDD 很小。因此,將其放在一個分區上就可以了。在 scala 中,我們可以在使用后使用重新分區binaryFiles,一切都會很好。
scala> sc.binaryFiles("dir").getNumPartitions
1
scala> sc.binaryFiles("dir").repartition(4).getNumPartitions
4
scala> sc.binaryFiles("dir").repartition(4)
.foreach{ case (name, ds) => {
println(System.currentTimeMillis+": "+name)
Thread.sleep(2000)
// do some reading on the DataStream ds
}}
1603352918396: file:/home/oanicol/sandbox/dir/test1
1603352918396: file:/home/oanicol/sandbox/dir/test3
1603352918396: file:/home/oanicol/sandbox/dir/test4
1603352918396: file:/home/oanicol/sandbox/dir/test5
1603352920397: file:/home/oanicol/sandbox/dir/test2
python 中的問題是binaryFiles實際上將文件讀取到一個分區上。另外,這對我來說非常神秘,但是 pyspark 2.4 中的以下代碼行會產生與您注意到的相同的行為,這是沒有意義的。
# this should work but does not
sc.binaryFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
# this does not work either, which is strange but it would not be advised anyway
# since all the data would be read on one partition
sc.binaryFiles("dir").repartition(4).foreach(lambda x: some_function(x, ''))
然而,由于binaryFiles實際讀取文件,您可以使用wholeTextFile它將文件作為文本文件讀取并按預期運行:
# this works
sc.wholeTextFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))
添加回答
舉報