亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

PySpark:從數據幀創建字典?

PySpark:從數據幀創建字典?

飲歌長嘯 2021-09-25 22:06:10
我有以下格式的數據,這些數據是從 Hive 獲取到數據幀中的:date, stock, price1388534400, GOOG, 501388534400, FB, 601388534400, MSFT, 551388620800, GOOG, 521388620800, FB, 611388620800, MSFT, 55其中 date 是當天午夜的紀元,我們有可以追溯到 10 年左右的數據(8 億多行)。我想得到一本字典如下:{'GOOG':{'1388534400': 50,'1388620800': 52}'FB':{'1388534400': 60,'1388620800': 61}}一種天真的方法是獲取唯一股票的列表,然后通過僅過濾掉每只股票的那些行來獲取數據幀的子集,但這似乎過于天真且效率極低。這可以在 Spark 中輕松完成嗎?我目前已經使用 PyHive 在本機 Python 中運行它,但由于數據量龐大,我寧愿在集群/Spark 上完成這項工作。
查看完整描述

2 回答

?
至尊寶的傳說

TA貢獻1789條經驗 獲得超10個贊

在 spark 2.4 中,您可以map_from_arrays在匯總每只股票的值時使用它來構建日期值映射。那么create_map使用股票代碼作為關鍵只是一個使用問題。此示例使用ChainMappython 3.4 來構建您所描述的最終 dict 結構。


import json

from collections import ChainMap

from pyspark.sql import SparkSession

from pyspark.sql.functions import *


spark = SparkSession \

    .builder \

    .appName("example") \

    .getOrCreate()


df = spark.createDataFrame([

    (1388534400, "GOOG", 50),

    (1388534400, "FB", 60),

    (1388534400, "MSFT", 55),

    (1388620800, "GOOG", 52),

    (1388620800, "FB", 61),

    (1388620800, "MSFT", 55)]

).toDF("date", "stock", "price")


out = df.groupBy("stock") \

        .agg(

            map_from_arrays(

                collect_list("date"), collect_list("price")).alias("values")) \

        .select(create_map("stock", "values").alias("values")) \

        .rdd.flatMap(lambda x: x) \

        .collect()


print(json.dumps(dict(ChainMap(*out)), indent=4, separators=(',', ': '), sort_keys=True))

這使:


{                                                                               

    "FB": {

        "1388534400": 60,

        "1388620800": 61

    },

    "GOOG": {

        "1388534400": 50,

        "1388620800": 52

    },

    "MSFT": {

        "1388534400": 55,

        "1388620800": 55

    }

}

但是,正如您所說,您有很多數據,您實際上可能并不想在內存中創建此字典,因此最好將其拆分并將相同的字典結構寫入不同分區的文件中。


讓我們通過將日期截斷到給定月份并為每個月和每個股票編寫單獨的文件來做到這一點:


out = df.groupBy(trunc(expr("CAST(date as TIMESTAMP)"), "month").alias("month"), df["stock"]) \

        .agg(

            map_from_arrays(

                collect_list("date"), collect_list("price")).alias("values")) \

        .select("month", "stock", create_map("stock", "values").alias("values"))


out.write.partitionBy("month", "stock").format("json").save("out/prices")

這為您提供了如下結構:


out

└── prices

    ├── _SUCCESS

    └── month=2014-01-01

        ├── stock=FB

        │   └── part-00093-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json

        ├── stock=GOOG

        │   └── part-00014-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json

        └── stock=MSFT

            └── part-00152-3741bdc2-345a-488e-82da-53bb586cd23b.c000.json

MSFT 文件如下所示:


{"values":{"MSFT":{"1388534400":55,"1388620800":55}}}

雖然“值”列名稱可能不在您的字典結構中,但我希望這說明了您可以做什么。


查看完整回答
反對 回復 2021-09-25
  • 2 回答
  • 0 關注
  • 147 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號