这种方式的好处就是,删除分区直接删除就可以,坏处是,通过上面的方式,由于Structured Streaming的目录地址是不允许变化的,也就是他拿到一次值之后,后续就固定了,所以数据都会写入到服务启动的那天。
解决方案
解决办法是自己实现一个parquet sink,改造的地方并不多。新添加一个类:
class NewFileStreamSink(
sparkSession: SparkSession,
_path: String,
fileFormat: FileFormat,
partitionColumnNames: Seq[String],
options: Map[String, String]) extends Sink with Logging { // 使用velocity模板引擎,方便实现复杂的模板渲染
def evaluate(value: String, context: Map[String, AnyRef]) = { RenderEngine.render(value, context)
}// 将路径获取改成一个方法调用,这样每次写入时,都会通过方法调用//从而获得一个新值
def path = {
evaluate(_path, Map("date" -> new DateTime()))
}
-- 这些路径获取都需要变成方法 private def basePath = new Path(path) private def logPath = new Path(basePath, FileStreamSink.metadataDir) private def fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString) private val hadoopConf = sparkSession.sessionState.newHadoopConf() override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
} else { val committer = FileCommitProtocol.instantiate(
className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
jobId = batchId.toString,
outputPath = path,
isAppend = false)
committer match { case manifestCommitter: ManifestFileCommitProtocol =>
manifestCommitter.setupManifestOptions(fileLog, batchId) case _ => // Do nothing
} FileFormatWriter.write(
sparkSession = sparkSession,
queryExecution = data.queryExecution,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
hadoopConf = hadoopConf,
partitionColumnNames = partitionColumnNames,
bucketSpec = None,
refreshFunction = _ => (),
options = options)
}
} override def toString: String = s"FileSink[$path]"}
实现sink之后,我们还需要一个DataSource 以便我们能让这个新的Sink集成进Spark里并被外部使用:
package org.apache.spark.sql.execution.streaming.newfileimport org.apache.spark.sql.{AnalysisException, SQLContext}import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormatimport org.apache.spark.sql.execution.streaming. Sinkimport org.apache.spark.sql.sources.StreamSinkProviderimport org.apache.spark.sql.streaming.OutputModeclass DefaultSource extends StreamSinkProvider { override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { val path = parameters.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified")
}) if (outputMode != OutputMode.Append) { throw new AnalysisException( s"Data source ${getClass.getCanonicalName} does not support $outputMode output mode")
} new NewFileStreamSink(sqlContext.sparkSession, parameters("path"), new ParquetFileFormat(), partitionColumns, parameters)
}
}
这个是标准的datasource API。 现在使用时可以这样:
save append table21
-- 使用jodatime的语法as parquet.`/tmp/jack/hp_date=${date.toString("yyyy-MM-dd")}`
options mode="Append"
and duration="10"-- 指定实现类and implClass="org.apache.spark.sql.execution.streaming.newfile"
and checkpointLocation="/tmp/cpl2";
是不是很方便?
额外的问题
在spark 2.2.0 之后,对meta文件合并,Spark做了些调整,如果合并过程中,发现之前的某个checkpoint点 文件会抛出异常。在spark 2.2.0则不存在这个问题。其实spark团队应该把这个作为可选项比较好,允许抛出或者保持安静。
作者:祝威廉
链接:https://www.jianshu.com/p/0fa3895dcd27