我想從流數據中計算數據,然后發送到網頁。例如:我將計算流數據中 TotalSales 列的總和。但它出錯了summary = dataStream.select('TotalSales').groupby().sum().toPandas(),這是我的代碼。import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructTypefrom pyspark.sql.functions import *spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()schema = StructType().add("_c0", "integer").add("InvoiceNo", "string").add("Quantity","integer").add("InvoiceDate","date").add("UnitPrice","integer").add("CustomerID","double").add("TotalSales","integer")INPUT_DIRECTORY = "C:/Users/HP/Desktop/test/jsonFile"dataStream = spark.readStream.format("json").schema(schema).load(INPUT_DIRECTORY)query = dataStream.writeStream.format("console").start()summary = dataStream.select('TotalSales').groupby().sum().toPandas()print(query.id)query.awaitTermination();這是命令行上顯示的錯誤。Traceback (most recent call last): File "testStreaming.py", line 12, in <module> dataStream = dataStream.toPandas() File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 2150, in toPandas pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 534, in collect sock_info = self._jdf.collectToPython() File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__answer, self.gateway_client, self.target_id, self.name) File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\utils.py", line 69, in decoraise AnalysisException(s.split(': ', 1)[1], stackTrace)pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nFileSource[C:/Users/HP/Desktop/test/jsonFile]'感謝您的回答。
1 回答

滄海一幻覺
TA貢獻1824條經驗 獲得超5個贊
你為什么要創建 aa pandas Df
toPandas 將創建一個在您的驅動程序節點本地的 DataFrame。我不確定你想在這里實現什么。Pandas DataFrame 表示一組固定的元組,其中結構化流是連續的數據流。
現在這個問題的一個可能的解決方案是完成你想要做的整個過程并將輸出發送到 parquet/csv 文件并使用這個 parquet/csv 等文件創建一個 pandas DF 。
summary = dataStream.select('TotalSales').groupby().sum()
query = dataStream.writeStream.format("parquet").outputMode("complete").start(outputPathDir)
query.awaitTermination()
添加回答
舉報
0/150
提交
取消