亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Spark 未使用二進制文件在并行 Pyspark 中運行 RDD

Spark 未使用二進制文件在并行 Pyspark 中運行 RDD

汪汪一只貓 2023-12-26 15:14:41
我是 Spark 的新手,開始用 Python 編寫一些腳本。我的理解是Spark并行執行Transformation(map)。def some_function(name, content):    print(name, datetime.now())    time.sleep(30)    return contentconfig = SparkConf().setAppName("sample2").setMaster("local[*]")filesRDD = SparkContext(conf=config).binaryFiles("F:\\usr\\temp\\*.zip")inputfileRDD = filesRDD.map(lambda job_bundle: (job_bundle[0], some_function(job_bundle[0], job_bundle[1])))print(inputfileRDD.collect())上面的代碼.zip從文件夾中收集文件列表并對其進行處理。當我執行它時,我看到這是按順序發生的。輸出file:/F:/usr/temp/sample.zip 2020-10-22 10:42:37.089085file:/F:/usr/temp/sample1.zip 2020-10-22 10:43:07.103317您可以看到它在 30 秒后開始處理第二個文件。意思是完成第一個文件后。我的代碼出了什么問題?為什么它不并行執行RDD?你能幫我么 ?
查看完整描述

1 回答

?
紅糖糍粑

TA貢獻1815條經驗 獲得超6個贊

我不確切知道該方法如何binaryFiles跨 Spark 分區對文件進行分區。似乎與此相反textFiles,它傾向于只創建一個分區。讓我們看看一個名為 的示例目錄dir,其中包含 5 個文件。


> ls dir

test1  test2  test3  test4  test5

如果我使用textFile,事情就會并行運行。我不提供輸出,因為它不是很漂亮,但你可以自己檢查。我們可以驗證事物是否與 并行運行getNumPartitions。


>>> sc.textFile("dir").foreach(lambda x: some_function(x, None))

# ugly output, but everything starts at the same time,

# except maybe the last one since you have 4 cores.

>>> sc.textFile("dir").getNumPartitions()

5

由于binaryFiles情況不同,并且由于某種原因,所有內容都進入同一個分區。


>>> sc.binaryFiles("dir").getNumPartitions()

1

我什至嘗試使用 10k 個文件,所有內容仍然位于同一分區。我相信這背后的原因是,在scala中,binaryFiles返回一個帶有文件名的RDD和一個允許讀取文件的對象(但不執行讀?。?。因此速度很快,并且生成的 RDD 很小。因此,將其放在一個分區上就可以了。在 scala 中,我們可以在使用后使用重新分區binaryFiles,一切都會很好。


scala> sc.binaryFiles("dir").getNumPartitions

1

scala> sc.binaryFiles("dir").repartition(4).getNumPartitions

4

scala> sc.binaryFiles("dir").repartition(4)

    .foreach{ case (name, ds) => { 

        println(System.currentTimeMillis+": "+name)

        Thread.sleep(2000)

        // do some reading on the DataStream ds

    }}

1603352918396: file:/home/oanicol/sandbox/dir/test1

1603352918396: file:/home/oanicol/sandbox/dir/test3

1603352918396: file:/home/oanicol/sandbox/dir/test4

1603352918396: file:/home/oanicol/sandbox/dir/test5

1603352920397: file:/home/oanicol/sandbox/dir/test2

python 中的問題是binaryFiles實際上將文件讀取到一個分區上。另外,這對我來說非常神秘,但是 pyspark 2.4 中的以下代碼行會產生與您注意到的相同的行為,這是沒有意義的。


# this should work but does not

sc.binaryFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))

# this does not work either, which is strange but it would not be advised anyway

# since all the data would be read on one partition

sc.binaryFiles("dir").repartition(4).foreach(lambda x: some_function(x, ''))

然而,由于binaryFiles實際讀取文件,您可以使用wholeTextFile它將文件作為文本文件讀取并按預期運行:


# this works

sc.wholeTextFiles("dir", minPartitions=4).foreach(lambda x: some_function(x, ''))


查看完整回答
反對 回復 2023-12-26
  • 1 回答
  • 0 關注
  • 164 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號