亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Spark SQL 分區依據、窗口、排序依據、計數

Spark SQL 分區依據、窗口、排序依據、計數

瀟瀟雨雨 2021-09-14 16:21:16
假設我有一個包含雜志訂閱信息的數據框:subscription_id    user_id       created_at       expiration_date 12384               1           2018-08-10        2018-12-10 83294               1           2018-06-03        2018-10-03 98234               1           2018-04-08        2018-08-08 24903               2           2018-05-08        2018-07-08 32843               2           2018-03-25        2018-05-25 09283               2           2018-04-07        2018-06-07現在我想添加一個列,說明在當前訂閱開始之前用戶有多少以前的訂閱已過期。換句話說,與給定用戶相關聯的到期日期在此訂閱的開始日期之前。這是完整的所需輸出:subscription_id    user_id       created_at       expiration_date   previous_expired 12384               1           2018-08-10        2018-12-10          1 83294               1           2018-06-03        2018-10-03          0 98234               1           2018-04-08        2018-08-08          0 24903               2           2018-05-08        2018-07-08          2 32843               2           2018-03-25        2018-05-03          1 09283               2           2018-01-25        2018-02-25          0嘗試:編輯:使用 Python 嘗試了各種滯后/領先/等,我現在認為這是一個 SQL 問題df = df.withColumn('shiftlag', func.lag(df.expires_at).over(Window.partitionBy('user_id').orderBy('created_at')))<--- 編輯,編輯:沒關系,這行不通我想我用盡了滯后/領先/轉移方法,發現它不起作用。我現在認為最好使用 Spark SQL 來做到這一點,也許使用 acase when來生成新列,結合 a having count,按 ID 分組?
查看完整描述

1 回答

?
神不在的星期二

TA貢獻1963條經驗 獲得超6個贊

使用 PySpark 解決了這個問題:


我首先創建了另一個列,其中包含每個用戶的所有到期日期數組:


joined_array = df.groupBy('user_id').agg(collect_set('expiration_date'))

然后將該數組加入到原始數據幀中:


joined_array = joined_array.toDF('user_idDROP', 'expiration_date_array')

df = df.join(joined_array, df.user_id == joined_array.user_idDROP, how = 'left').drop('user_idDROP')

然后創建一個函數來遍歷數組,如果創建日期大于到期日期,則將計數加 1:


def check_expiration_count(created_at, expiration_array):

  if not expiration_array:

    return 0

  else:

   count = 0

    for i in expiration_array:

  if created_at > i:

    count += 1

return count


check_expiration_count = udf(check_expiration_count, IntegerType())

然后應用該函數創建一個具有正確計數的新列:


df = df.withColumn('count_of_subs_ending_before_creation', check_expiration_count(df.created_at, df.expiration_array))

瓦拉。完畢。謝謝大家(沒有人幫忙,但還是謝謝)。希望有人在 2022 年發現這很有用


查看完整回答
反對 回復 2021-09-14
  • 1 回答
  • 0 關注
  • 283 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號