1 回答

TA貢獻1815條經驗 獲得超13個贊
該錯誤意味著您不能在 UDF 中使用Spark 數據幀。但是由于您包含數據庫和表名稱的數據框很可能很小,因此只需使用 Pythonfor循環就足夠了,以下是一些可能有助于獲取數據的方法:
from pyspark.sql import Row
# assume dfs is the df containing database names and table names
dfs.printSchema()
root
|-- database: string (nullable = true)
|-- tableName: string (nullable = true)
方法一:使用 df.dtypes
運行 sqlselect * from database.tableName limit 1生成 df 并返回其 dtypes,將其轉換為 StringType()。
data = []
DRow = Row('database', 'tableName', 'dtypes')
for row in dfs.collect():
try:
dtypes = spark.sql('select * from `{}`.`{}` limit 1'.format(row.database, row.tableName)).dtypes
data.append(DRow(row.database, row.tableName, str(dtypes)))
except Exception, e:
print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))
pass
df_dtypes = spark.createDataFrame(data)
# DataFrame[database: string, tableName: string, dtypes: string]
筆記:
使用dtypes而不是str(dtypes)將分別獲得以下模式 where_1和_2arecol_name和col_dtype:
root
|-- database: string (nullable = true)
|-- tableName: string (nullable = true)
|-- dtypes: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: string (nullable = true)
| | |-- _2: string (nullable = true)
使用這種方法,每個表將只有一行。對于接下來的兩種方法,表的每個 col_type 都會有自己的行。
方法二:使用描述
您還可以通過運行spark.sql("describe tableName")直接獲取數據幀來檢索此信息,然后使用 reduce 函數來合并所有表的結果。
from functools import reduce
def get_df_dtypes(db, tb):
try:
return spark.sql('desc `{}`.`{}`'.format(db, tb)) \
.selectExpr(
'"{}" as `database`'.format(db)
, '"{}" as `tableName`'.format(tb)
, 'col_name'
, 'data_type')
except Exception, e:
print("ERROR from {}.{}: [{}]".format(db, tb, e))
pass
# an example table:
get_df_dtypes('default', 'tbl_df1').show()
+--------+---------+--------+--------------------+
|database|tableName|col_name| data_type|
+--------+---------+--------+--------------------+
| default| tbl_df1| array_b|array<struct<a:st...|
| default| tbl_df1| array_d| array<string>|
| default| tbl_df1|struct_c|struct<a:double,b...|
+--------+---------+--------+--------------------+
# use reduce function to union all tables into one df
df_dtypes = reduce(lambda d1, d2: d1.union(d2), [ get_df_dtypes(row.database, row.tableName) for row in dfs.collect() ])
方法 3:使用 spark.catalog.listColumns()
使用 spark.catalog.listColumns() 創建collections.Column對象列表,檢索name和dataType合并數據。生成的數據框在它們自己的列上使用 col_name 和 col_dtype 進行標準化(與使用Method-2相同)。
data = []
DRow = Row('database', 'tableName', 'col_name', 'col_dtype')
for row in dfs.select('database', 'tableName').collect():
try:
for col in spark.catalog.listColumns(row.tableName, row.database):
data.append(DRow(row.database, row.tableName, col.name, col.dataType))
except Exception, e:
print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))
pass
df_dtypes = spark.createDataFrame(data)
# DataFrame[database: string, tableName: string, col_name: string, col_dtype: string]
注意:在檢索元數據時,不同的 Spark 發行版/版本可能會產生不同的結果describe tbl_name和其他命令,請確保在查詢中使用正確的列名。
添加回答
舉報