我正在遷移表中的一些數據,我正在嘗試更改“日期”列的值,但 PySpark 似乎在讀取數據時刪除了數據。我正在執行以下步驟:從表中讀取數據更改列的值將數據覆蓋到同一張表當我在這些步驟之后檢查數據時,我的表是空的。這是我的代碼table = "MY_TABLE" data_input = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable=table).load()print("data_input.count()=", data_input.count())print("'2019' in data_input:", data_input.where(col("date").contains("2019")).count())print("'YEAR' in data_input:", data_input.where(col("date").contains("YEAR")).count())# data_input.count()= 1000# '2019' in data_input: 1000# 'YEAR' in data_input: 0data_output = data_input.withColumn("date", F.regexp_replace("date", "2019", "YEAR"))print("data_output.count()=", data_output.count())print("'2019' in data_output:", data_output.where(col("date").contains("2019")).count())print("'YEAR' in data_output:", data_output.where(col("date").contains("YEAR")).count())# data_output.count()= 1000# '2019' in data_output: 1000# 'YEAR' in data_output: 0到目前為止一切順利,讓我們覆蓋表格df_writer = DataFrameWriter(data_output)df_writer.jdbc(url = JDBCURL, table=table, mode="overwrite")# Let's check the data nowprint("data_input.count()=", data_input.count())print("'2019' in data_input:", data_input.where(col("date").contains("2019")).count())print("'YEAR' in data_input:", data_input.where(col("date").contains("YEAR")).count())# data_input.count()= 0# '2019' in data_input: 0# 'YEAR' in data_input: 0# huh, weirdprint("data_output.count()=", data_output.count())print("'2019' in data_output:", data_output.where(col("date").contains("2019")).count())print("'YEAR' in data_output:", data_output.where(col("date").contains("YEAR")).count())# data_output.count()= 0# '2019' in data_output: 0# 'YEAR' in data_output: 0# Still weird查詢SELECT * FROM MY_TABLE返回 0 行。為什么 [Py]Spark 這樣做?我怎樣才能改變這種行為?緩存?這在文檔中有什么解釋?
2 回答

紅糖糍粑
TA貢獻1815條經驗 獲得超6個贊
我只是遇到了同樣的問題并.cache()
在閱讀表格后添加為我修復了它,正如那里所解釋的那樣:
data_input = sqlContext.read.format("jdbc").options(url=JDBCURL, dbtable=table).cache()
data_output = [ do something with data_input ]
data_output.write.jdbc(url = JDBCURL, table=table, mode="overwrite")

慕尼黑5688855
TA貢獻1848條經驗 獲得超2個贊
我通過“緩存”數據框找到了解決方法:
data_pandas = data_output.toPandas()
data_spark = spark.createDataFrame(data_pandas)
data_spark.write.jdbc(url=JDBCURL, table=table, mode="overwrite")
添加回答
舉報
0/150
提交
取消