亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Python Spark - 在閾值后刪除數據 - Pyspark

Python Spark - 在閾值后刪除數據 - Pyspark

狐的傳說 2023-10-26 10:23:10
如何刪除最后TP == 1一個具有 48 小時緩沖區的數據?例如ID = A9,最后一個TP == 1是 on 2020-05-06 13:00。我想保留該組 ID 的所有數據,直到2020-05-06 13:00最后TP == 1加上接下來的 2 天?+---++--------+----------------+| id|       TP|            Date|+---+---------+----------------+| A1|     Null|2010-01-01 12:00|| A1|     Null|2010-01-01 13:00|| A1|        1|2010-01-02 01:00|| A1|     Null|2010-01-02 02:00|| A9|     Null|2010-05-05 12:00|| A9|        1|2010-05-05 13:00|| A9|        1|2010-05-06 13:00|| A9|     Null|2010-05-09 13:00|+---+---------+----------------+所需的數據框+---++--------+----------------+| id|       TP|            Date|+---+---------+----------------+| A1|     Null|2010-01-01 12:00|| A1|     Null|2010-01-01 13:00|| A1|        1|2010-01-02 01:00|| A1|     Null|2010-01-02 02:00|| A9|     Null|2010-05-05 12:00|| A9|        1|2010-05-05 13:00|| A9|        1|2010-05-06 13:00|+---+---------+----------------+這就是我在 Pandas 中所做的,但對于 15M+ 的觀察結果效率不高main_pd = main.toPandas()bigdf = pd.DataFrame()for i in main_pd.ID.unique():  df = main_pd[main_pd.ID == i]  TPdate = df[df.TP == 1]['Date'].max()+pd.Timedelta('3 days 0 hours')  df = df[(df.Date <= TPdate)]  bigdf = bigdf.append(df)
查看完整描述

2 回答

?
月關寶盒

TA貢獻1772條經驗 獲得超5個贊

IIUC,您可以使用窗口函數查找max(IF(TP=1, Date, NULL))每個id,然后按此閾值進行過濾:


from pyspark.sql import Window, functions as F

w1 = Window.partitionBy('id')


df_new = df.withColumn('Date', F.to_timestamp('Date', 'yyyy-MM-dd HH:mm')) \

    .withColumn('threshhold_date', F.expr("max(IF(TP=1, Date, NULL))").over(w1)) \

    .filter('Date <= threshhold_date + interval 2 days') 

df_new.show()

+---+----+-------------------+-------------------+

| id|  TP|               Date|    threshhold_date|

+---+----+-------------------+-------------------+

| A9|Null|2010-05-05 12:00:00|2010-05-06 13:00:00|

| A9|   1|2010-05-05 13:00:00|2010-05-06 13:00:00|

| A9|   1|2010-05-06 13:00:00|2010-05-06 13:00:00|

| A1|Null|2010-01-01 12:00:00|2010-01-02 01:00:00|

| A1|Null|2010-01-01 13:00:00|2010-01-02 01:00:00|

| A1|   1|2010-01-02 01:00:00|2010-01-02 01:00:00|

| A1|Null|2010-01-02 02:00:00|2010-01-02 01:00:00|

+---+----+-------------------+-------------------+


查看完整回答
反對 回復 2023-10-26
?
阿晨1998

TA貢獻2037條經驗 獲得超6個贊

您可以簡單地過濾數據幀TP = 1, 并使用collect()[0]來獲取列的最大值Date作為變量。

使用以下命令向該變量添加 48 小時timedelta并過濾df:



from pyspark.sql.functions import *

from datetime import timedelta


date_var = df.filter(col("TP")==1).orderBy("date", ascending=False)\

                .collect()[0]["date"] + timedelta(hours=48)


df.filter(col("Date")<=date_var).show()


查看完整回答
反對 回復 2023-10-26
  • 2 回答
  • 0 關注
  • 151 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號