2 回答

TA貢獻1783條經驗 獲得超4個贊
下面是一個示例,說明如何測試拋出異常的 PySpark 函數。在此示例中,我們將驗證如果排序順序為 則拋出異常"cats"。
def it_throws_an_error_if_the_sort_order_is_invalid(spark):
? ? source_df = spark.create_df(
? ? ? ? [
? ? ? ? ? ? ("jose", "oak", "switch"),
? ? ? ? ? ? ("li", "redwood", "xbox"),
? ? ? ? ? ? ("luisa", "maple", "ps4"),
? ? ? ? ],
? ? ? ? [
? ? ? ? ? ? ("name", StringType(), True),
? ? ? ? ? ? ("tree", StringType(), True),
? ? ? ? ? ? ("gaming_system", StringType(), True),
? ? ? ? ]
? ? )
? ? with pytest.raises(ValueError) as excinfo:
? ? ? ? quinn.sort_columns(source_df, "cats")
? ? assert excinfo.value.args[0] == "['asc', 'desc'] are the only valid sort orders and you entered a sort order of 'cats'"
請注意,該測試正在驗證所提供的特定錯誤消息。
您可以向您的rename_columnsName
函數提供無效輸入并驗證錯誤消息是否符合您的預期。

TA貢獻1735條經驗 獲得超5個贊
我找到了這個問題的解決方案,我們可以像 python 一樣在 Pyspark 中處理異常。例如:
def rename_columnsName(df, columns):#provide names in dictionary format
try:
if isinstance(columns, dict):
for old_name, new_name in columns.items():
df = df.withColumnRenamed(old_name, new_name)
return df.show()
else:
raise ValueError("'columns' should be a dict, like {'old_name':'new_name',
'old_name_one more':'new_name_1'}")
except Exception as e:
print(e)
添加回答
舉報