2 回答

TA貢獻1789條經驗 獲得超10個贊
在 spark 2.4 中,您可以map_from_arrays在匯總每只股票的值時使用它來構建日期值映射。那么create_map使用股票代碼作為關鍵只是一個使用問題。此示例使用ChainMappython 3.4 來構建您所描述的最終 dict 結構。
import json
from collections import ChainMap
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.appName("example") \
.getOrCreate()
df = spark.createDataFrame([
(1388534400, "GOOG", 50),
(1388534400, "FB", 60),
(1388534400, "MSFT", 55),
(1388620800, "GOOG", 52),
(1388620800, "FB", 61),
(1388620800, "MSFT", 55)]
).toDF("date", "stock", "price")
out = df.groupBy("stock") \
.agg(
map_from_arrays(
collect_list("date"), collect_list("price")).alias("values")) \
.select(create_map("stock", "values").alias("values")) \
.rdd.flatMap(lambda x: x) \
.collect()
print(json.dumps(dict(ChainMap(*out)), indent=4, separators=(',', ': '), sort_keys=True))
這使:
{
"FB": {
"1388534400": 60,
"1388620800": 61
},
"GOOG": {
"1388534400": 50,
"1388620800": 52
},
"MSFT": {
"1388534400": 55,
"1388620800": 55
}
}
但是,正如您所說,您有很多數據,您實際上可能并不想在內存中創建此字典,因此最好將其拆分并將相同的字典結構寫入不同分區的文件中。
讓我們通過將日期截斷到給定月份并為每個月和每個股票編寫單獨的文件來做到這一點:
out = df.groupBy(trunc(expr("CAST(date as TIMESTAMP)"), "month").alias("month"), df["stock"]) \
.agg(
map_from_arrays(
collect_list("date"), collect_list("price")).alias("values")) \
.select("month", "stock", create_map("stock", "values").alias("values"))
out.write.partitionBy("month", "stock").format("json").save("out/prices")
這為您提供了如下結構:
out
└── prices
├── _SUCCESS
└── month=2014-01-01
├── stock=FB
│ └── part-00093-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
├── stock=GOOG
│ └── part-00014-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
└── stock=MSFT
└── part-00152-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json
MSFT 文件如下所示:
{"values":{"MSFT":{"1388534400":55,"1388620800":55}}}
雖然“值”列名稱可能不在您的字典結構中,但我希望這說明了您可以做什么。
添加回答
舉報