亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

pyspark計算當前時間和上次活動時間之間差異的移動平均值

pyspark計算當前時間和上次活動時間之間差異的移動平均值

海綿寶寶撒 2021-11-09 16:23:20
我有一些這樣的記錄。A    B1    2018-12-252    2019-01-151    2019-01-203    2018-01-012    2019-01-014    2018-04-093    2018-11-081    2018-03-20我想要得到的是這樣的東西。第一步,在組內按升序排列。(不需要按A訂購)A    B1    2018-03-201    2018-12-251    2019-01-203    2018-01-013    2018-11-082    2019-01-012    2019-01-154    2018-04-09第二步,獲取組內連續行之間的時間差。A    B            C1    2018-03-20   NaN1    2018-12-25   2801    2019-01-20   263    2018-01-01   NaN3    2018-11-08   3112    2019-01-01   NaN2    2019-01-15   144    2018-04-09   NaN第三步,得到窗口大小為2的C的移動平均。(因為我只提供了很少的行作為例子,為了方便就選擇大小2)A    B            C     moving_avg1    2018-03-20   NaN   NaN1    2018-12-25   280   2801    2019-01-20   26    1533    2018-01-01   NaN   NaN3    2018-11-08   311   3112    2019-01-01   NaN   NaN2    2019-01-15   14    144    2018-04-09   NaN   NaN如果 Windows 函數可以處理這種情況,該解決方案實際上不需要生成 C 列。我列出每個步驟只是為了確保您可以清楚地了解問題所在。結果集將如下所示A    B            moving_avg1    2018-03-20   NaN1    2018-12-25   2801    2019-01-20   1533    2018-01-01   NaN3    2018-11-08   3112    2019-01-01   NaN2    2019-01-15   144    2018-04-09   NaN注意:這是在 pyspark 上并使用數據框。不是在 Python 上使用 Pandas。
查看完整描述

2 回答

?
湖上湖

TA貢獻2003條經驗 獲得超2個贊

可能有更聰明的方法來實現這一點,但您也可以使用 RDD :


from operator import add

from numpy import mean

from datetime import datetime


data = [(1, "2018-12-25"), (2, "2019-01-15"), (1, "2019-01-20"), (3, "2018-01-01"),

        (2, "2019-01-01"), (4, "2018-04-09"), (3, "2018-11-08"), (1, "2018-03-20")]

data = sc.parallelize(data).mapValues(lambda v: [datetime.strptime(v, "%Y-%m-%d")]).reduceByKey(add)


def computeMvgAvg(values):

sorted_date = sorted(values)

diffs = []

mvg_avg = []

for i in range(1, len(sorted_date)):

    diffs.append(int((sorted_date[i] - sorted_date[i-1]).total_seconds()/86400))

    mvg_avg.append(int(mean(diffs)))

diffs = [None] + diffs

mvg_avg = [None] + mvg_avg

return zip(sorted_date, diffs, mvg_avg)


sch = StructType([

   StructField("A", StringType(), True),

   StructField("B", DateType(), True),

   StructField("C", IntegerType(), True),

   StructField("moving_avg", IntegerType(), True)

])

data.flatMapValues(myMapValues).map(lambda row: [row[0]] + list(row[1])).toDF(schema=sch).show()


+---+----------+----+----------+

|  A|         B|   C|moving_avg|

+---+----------+----+----------+

|  1|2018-03-20|null|      null|

|  1|2018-12-25| 280|       280|

|  1|2019-01-20|  26|       153|

|  2|2019-01-01|null|      null|

|  2|2019-01-15|  14|        14|

|  3|2018-01-01|null|      null|

|  3|2018-11-08| 311|       311|

|  4|2018-04-09|null|      null|

+---+----------+----+----------+


查看完整回答
反對 回復 2021-11-09
?
慕姐8265434

TA貢獻1813條經驗 獲得超2個贊

文檔: 窗口

文檔: 滯后

# Creating a Dataframe

from pyspark.sql.window import Window

from pyspark.sql.functions import col, to_date, lag, datediff, when, udf

df = sqlContext.createDataFrame([(1,'2018-12-25'),(2,'2019-01-15'),(1,'2019-01-20'),(3,'2018-01-01'),

                                 (2,'2019-01-01'),(4,'2018-04-09'),(3,'2018-11-08'),(1,'2018-03-20')],

                                 ['A','B'])

df = df.withColumn('B',to_date(col('B'), 'yyyy-MM-dd'))


# Using window and lag functions to find the value from previous row

my_window = Window.partitionBy('A').orderBy('A','B')


# Creating a UDF to calculate average of window sized 2.

def row_avg(c1,c2):

    count_non_null = 2

    total = 0

    if c1 == None:

        c1 = 0

        count_non_null = count_non_null - 1

    if c2 == None:

        c2 = 0

        count_non_null = count_non_null - 1

    if count_non_null == 0:

        return None

    else:

        return int((c1+c2)/count_non_null)


row_avg = udf(row_avg)


df = df.withColumn('B_Lag_1', lag(col('B'),1).over(my_window))\

       .withColumn('C', datediff(col('B'),col('B_Lag_1'))).drop('B_Lag_1')\

       .withColumn('C_Lag_1', lag(col('C'),1).over(my_window))\

       .withColumn('moving_avg',row_avg(col('C'),col('C_Lag_1'))).drop('C','C_Lag_1')

df.show()

+---+----------+----------+

|  A|         B|moving_avg|

+---+----------+----------+

|  1|2018-03-20|      null|

|  1|2018-12-25|       280|

|  1|2019-01-20|       153|

|  3|2018-01-01|      null|

|  3|2018-11-08|       311|

|  2|2019-01-01|      null|

|  2|2019-01-15|        14|

|  4|2018-04-09|      null|

+---+----------+----------+


查看完整回答
反對 回復 2021-11-09
  • 2 回答
  • 0 關注
  • 172 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號