3 回答

TA貢獻1799條經驗 獲得超9個贊
如果運行repartition(COL)
,則在計算過程中更改分區-您將獲得spark.sql.shuffle.partitions
(默認值:200)分區。如果您隨后致電,.write
您將獲得一個包含許多文件的目錄。
如果運行,.write.partitionBy(COL)
則結果將獲得與COL中的唯一值一樣多的目錄。這樣可以加快進一步的數據讀取速度(如果您按分區列進行過濾),并節省了一些存儲空間(分區列已從數據文件中刪除)。
更新:參見@conradlee的答案。他不僅詳細說明了應用不同方法后的目錄結構,而且還解釋了兩種情況下文件的數量。

TA貢獻1871條經驗 獲得超8個贊
repartition()用于對內存中的數據進行分區,并partitionBy用于對磁盤上的數據進行分區。如本博客文章所述,它們通常結合使用。
二者repartition()并partitionBy可以用于“基于數據幀列分區數據”,但repartition()在存儲分區中的數據和partitionBy分區在磁盤上的數據。
repartition()
讓我們玩一些代碼以更好地了解分區。假設您具有以下CSV數據。
first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China
df.repartition(col("country")) 將按內存中的國家/地區對數據進行分區。
讓我們寫出數據,以便我們可以檢查每個內存分區的內容。
val outputPath = new java.io.File("./tmp/partitioned_by_country/").getCanonicalPath
df.repartition(col("country"))
.write
.csv(outputPath)
這是將數據寫到磁盤上的方法:
partitioned_by_country/
part-00002-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
part-00044-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
每個文件都包含一個國家/ part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv地區的數據-該文件包含以下中國數據,例如:
Bruce,Lee,China
Jack,Ma,China
partitionBy()
讓我們將數據寫到磁盤上partitionBy,看看文件系統輸出如何不同。
這是將數據寫到磁盤分區的代碼。
val outputPath = new java.io.File("./tmp/partitionedBy_disk/").getCanonicalPath
df
.write
.partitionBy("country")
.csv(outputPath)
磁盤上的數據如下所示:
partitionedBy_disk/
country=Argentina/
part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000.csv
country=China/
part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
country=Russia/
part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
為什么要對磁盤上的數據進行分區?
如本博文所述,對磁盤上的數據進行分區可以使某些查詢運行得更快。
添加回答
舉報