我每天都會收到一個 zip 存檔“2018-06-26.zip”,大小約為 . 250 Mb 壓縮,包含 165-170.000 個小的 XML 文件 (Kb's)。我將 zip-archive 加載到 HDFS(避免小文件問題),并使用 SPARK 從 zip 中提取它們(zip 不可拆分),制作一個 Paired RDD,以文件名為鍵,內容為值并保存通過成對的 RDD 將它們作為序列文件。使用一個僅包含 3 個用于測試目的的 XML 文件的小型 zip 存檔,一切都運行順暢,但是當我將其提供給大存檔時,我得到了 java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) ... ...我在 Cloudera Quickstart VM 上運行:CDH 5.13.3(HDFS:2.60,JDK:1.7.0.67,SPARK:1.6.0,Scala 2.10)我還沒有在成熟的集群上運行它,因為我想在部署它之前確保我的代碼是正確的......垃圾收集器在超出開銷限制的情況下繼續運行 OOM。我知道要增加驅動程序和 Java 堆空間的內存量,但我懷疑我的方法占用了太多內存.... 監視內存使用情況,雖然沒有發現任何內存泄漏....這是代碼:import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.SQLContextimport java.util.zip.{ZipEntry, ZipInputStream}import org.apache.spark.input.PortableDataStreamimport scala.collection.mutableval conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)var xml_map = new mutable.HashMap[String, String]()sc.binaryFiles("/user/cloudera/test/2018-06-26.zip", 10).collect .foreach { zip_file : (String, PortableDataStream) => val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open) var zip_entry : ZipEntry = null while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) { if (!zip_entry.isDirectory) { val key_file_name = zip_entry.getName val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n") xml_map += ( key_file_name -> value_file_content ) } zip_stream.closeEntry() } zip_stream.close() }val xml_rdd = sc.parallelize(xml_map.toSeq).saveAsSequenceFile("/user/cloudera/2018_06_26")任何幫助或想法都受到高度贊賞。
添加回答
舉報
0/150
提交
取消