亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何在 Pyspark 中計算或管理流數據

如何在 Pyspark 中計算或管理流數據

慕容3067478 2022-12-20 12:25:58
我想從流數據中計算數據,然后發送到網頁。例如:我將計算流數據中 TotalSales 列的總和。但它出錯了summary = dataStream.select('TotalSales').groupby().sum().toPandas(),這是我的代碼。import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructTypefrom pyspark.sql.functions import *spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()schema = StructType().add("_c0", "integer").add("InvoiceNo", "string").add("Quantity","integer").add("InvoiceDate","date").add("UnitPrice","integer").add("CustomerID","double").add("TotalSales","integer")INPUT_DIRECTORY = "C:/Users/HP/Desktop/test/jsonFile"dataStream = spark.readStream.format("json").schema(schema).load(INPUT_DIRECTORY)query = dataStream.writeStream.format("console").start()summary = dataStream.select('TotalSales').groupby().sum().toPandas()print(query.id)query.awaitTermination();這是命令行上顯示的錯誤。Traceback (most recent call last):  File "testStreaming.py", line 12, in <module>    dataStream = dataStream.toPandas()  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 2150, in toPandas    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 534, in collect    sock_info = self._jdf.collectToPython()  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__answer, self.gateway_client, self.target_id, self.name)  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\utils.py", line 69, in decoraise AnalysisException(s.split(': ', 1)[1], stackTrace)pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nFileSource[C:/Users/HP/Desktop/test/jsonFile]'感謝您的回答。
查看完整描述

1 回答

?
滄海一幻覺

TA貢獻1824條經驗 獲得超5個贊

你為什么要創建 aa pandas Df


toPandas 將創建一個在您的驅動程序節點本地的 DataFrame。我不確定你想在這里實現什么。Pandas DataFrame 表示一組固定的元組,其中結構化流是連續的數據流。


現在這個問題的一個可能的解決方案是完成你想要做的整個過程并將輸出發送到 parquet/csv 文件并使用這個 parquet/csv 等文件創建一個 pandas DF 。


summary = dataStream.select('TotalSales').groupby().sum()

query = dataStream.writeStream.format("parquet").outputMode("complete").start(outputPathDir)

query.awaitTermination()


查看完整回答
反對 回復 2022-12-20
  • 1 回答
  • 0 關注
  • 114 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號