亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在 Pyspark 中使用 udf 時出現 __getnewargs__ 錯誤

在 Pyspark 中使用 udf 時出現 __getnewargs__ 錯誤

慕虎7371278 2022-06-22 17:00:08
有一個包含 2 列(db 和 tb)的數據框:db 代表數據庫,tb 代表該數據庫的 tableName。   +--------------------+--------------------+   |            database|           tableName|   +--------------------+--------------------+   |aaaaaaaaaaaaaaaaa...|    tttttttttttttttt|   |bbbbbbbbbbbbbbbbb...|    rrrrrrrrrrrrrrrr|   |aaaaaaaaaaaaaaaaa...|  ssssssssssssssssss|我在python中有以下方法: def _get_tb_db(db, tb):      df = spark.sql("select * from {}.{}".format(db, tb))      return df.dtypes這個udf: test = udf(lambda db, tb: _get_tb_db(db, tb), StringType())運行時: df = df.withColumn("dtype", test(col("db"), col("tb")))有以下錯誤: pickle.PicklingError: Could not serialize object: Py4JError: An  error occurred while calling o58.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist我發現了一些關于 stackoverflow 的討論:Spark __getnewargs__ 錯誤 ,但我不確定如何解決這個問題?錯誤是因為我在 UDF 中創建了另一個數據框嗎?類似于鏈接中的解決方案,我嘗試了這個:       cols = copy.deepcopy(df.columns)       df = df.withColumn("dtype", scanning(cols[0], cols[1]))但仍然出現錯誤有什么解決辦法嗎?
查看完整描述

1 回答

?
蕭十郎

TA貢獻1815條經驗 獲得超13個贊

該錯誤意味著您不能在 UDF 中使用Spark 數據幀。但是由于您包含數據庫和表名稱的數據框很可能很小,因此只需使用 Pythonfor循環就足夠了,以下是一些可能有助于獲取數據的方法:


from pyspark.sql import Row


# assume dfs is the df containing database names and table names

dfs.printSchema()

root

 |-- database: string (nullable = true)

 |-- tableName: string (nullable = true)

方法一:使用 df.dtypes

運行 sqlselect * from database.tableName limit 1生成 df 并返回其 dtypes,將其轉換為 StringType()。


data = []

DRow = Row('database', 'tableName', 'dtypes')

for row in dfs.collect():

  try:

    dtypes = spark.sql('select * from `{}`.`{}` limit 1'.format(row.database, row.tableName)).dtypes

    data.append(DRow(row.database, row.tableName, str(dtypes)))

  except Exception, e:

    print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))

    pass


df_dtypes = spark.createDataFrame(data)

# DataFrame[database: string, tableName: string, dtypes: string]

筆記:


使用dtypes而不是str(dtypes)將分別獲得以下模式 where_1和_2arecol_name和col_dtype:


root

 |-- database: string (nullable = true)

 |-- tableName: string (nullable = true)

 |-- dtypes: array (nullable = true)

 |    |-- element: struct (containsNull = true)

 |    |    |-- _1: string (nullable = true)

 |    |    |-- _2: string (nullable = true)

使用這種方法,每個表將只有一行。對于接下來的兩種方法,表的每個 col_type 都會有自己的行。


方法二:使用描述

您還可以通過運行spark.sql("describe tableName")直接獲取數據幀來檢索此信息,然后使用 reduce 函數來合并所有表的結果。


from functools import reduce


def get_df_dtypes(db, tb):

  try:

    return spark.sql('desc `{}`.`{}`'.format(db, tb)) \

                .selectExpr(

                      '"{}" as `database`'.format(db)

                    , '"{}" as `tableName`'.format(tb)

                    , 'col_name'

                    , 'data_type')

  except Exception, e:

    print("ERROR from {}.{}: [{}]".format(db, tb, e))

    pass


# an example table:

get_df_dtypes('default', 'tbl_df1').show()

+--------+---------+--------+--------------------+

|database|tableName|col_name|           data_type|

+--------+---------+--------+--------------------+

| default|  tbl_df1| array_b|array<struct<a:st...|

| default|  tbl_df1| array_d|       array<string>|

| default|  tbl_df1|struct_c|struct<a:double,b...|

+--------+---------+--------+--------------------+


# use reduce function to union all tables into one df

df_dtypes = reduce(lambda d1, d2: d1.union(d2), [ get_df_dtypes(row.database, row.tableName) for row in dfs.collect() ])

方法 3:使用 spark.catalog.listColumns()

使用 spark.catalog.listColumns() 創建collections.Column對象列表,檢索name和dataType合并數據。生成的數據框在它們自己的列上使用 col_name 和 col_dtype 進行標準化(與使用Method-2相同)。


data = []

DRow = Row('database', 'tableName', 'col_name', 'col_dtype')

for row in dfs.select('database', 'tableName').collect():

  try:

    for col in spark.catalog.listColumns(row.tableName, row.database):

      data.append(DRow(row.database, row.tableName, col.name, col.dataType))

  except Exception, e:

    print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))

    pass


df_dtypes = spark.createDataFrame(data)

# DataFrame[database: string, tableName: string, col_name: string, col_dtype: string]

注意:在檢索元數據時,不同的 Spark 發行版/版本可能會產生不同的結果describe tbl_name和其他命令,請確保在查詢中使用正確的列名。


查看完整回答
反對 回復 2022-06-22
  • 1 回答
  • 0 關注
  • 480 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號