整体思路是通过partition并行链接关系数据库。
实现:
1. 加载驱动程序
正确配置:
--driver-class-path "driver_local_file_system_jdbc_driver1.jar:driver_local_file_system_jdbc_driver2.jar" --class "spark.executor.extraClassPath=executors_local_file_system_jdbc_driver1.jar:executors_local_file_system_jdbc_driver2.jar"
如果需要在NoteBook中执行任务,需要在启动前设置EXTRA_CLASSPATH,执行如下命令:
export EXTRA_CLASSPATH=path_to_the_first_jar:path_to_the_second_jar
2. 并行加载
有两种方式:
1)按照指定列进行统一分区
2)通过用户自定义谓词分区
按照指定列进行统一分区
指定列必须是数字类型
使用方法
sqlctx.read.jdbc(url = "<URL>", table = "<TABLE>", columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", lowerBound = minValue, upperBound = maxValue, numPartitions = 20, connectionProperties = new java.util.Properties() )
通过用户自定义谓词分区
使用方法
val predicates = Array("2015-06-20" -> "2015-06-30", "2015-07-01" -> "2015-07-10", "2015-07-11" -> "2015-07-20", "2015-07-21" -> "2015-07-31").map { case (start, end) => s"cast(DAT_TME as date) >= date '$start' " + "AND cast(DAT_TME as date) <= date '$end'"} sqlctx.read.jdbc(url = "<URL>", table = "<TABLE>", predicates = predicates, connectionProperties = new java.util.Properties())
3.表格union
def readTable(table: String): DataFrameList("<TABLE1>", "<TABLE2>", "<TABLE3>").par.map(readTable).reduce(_ unionAll _)
.par 表示readTable函数会并行调用,而不是线性顺序。
4.映射为Case Class
case class MyClass(a: Long, b: String, c: Int, d: String, e: String)dataframe.map { case Row(a: java.math.BigDecimal, b: String, c: Int, _: String, _: java.sql.Date, e: java.sql.Date, _: java.sql.Timestamp, _: java.sql.Timestamp, _: java.math.BigDecimal, _: String) => MyClass(a = a.longValue(), b = b, c = c, d = d.toString, e = e.toString) }
不可以处理包含null值的记录。可以通过
dataframe.na.drop()
通过处理后,丢弃包含null的记录。
作者:jacksu在简书
链接:https://www.jianshu.com/p/a0cb577a4586
點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦