3 回答
TA貢獻1777條經驗 獲得超3個贊
repartition
df .repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")coalesce:
df .coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")mydata.csv/part-00000
catgetmerge
TA貢獻1874條經驗 獲得超12個贊
import org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs._def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output}val newData =
<< create your dataframe >>val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filenamevar mergeFindGlob = outputFileName
newData.write .format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()TA貢獻1824條經驗 獲得超6個贊
coalesce(1)repartition(1)
FileUtil.copyMerge()
Coalesce()
copyMerge()
添加回答
舉報
