1 回答

TA貢獻1895條經驗 獲得超7個贊
這是我嘗試進行一些修改,例如,我無法理解如何存在 62 秒。
from pyspark.sql.functions import *
from pyspark.sql import Window
w = Window.orderBy('time')
df.select('id', 'time') \
.withColumn('time', to_timestamp('time', 'yyyy-MM-dd HH:mm:ss.SSS')) \
.withColumn('time2', coalesce(lead('time', 1).over(w), expr('time + interval 10 seconds'))) \
.withColumn('seq', expr("sequence(time, time2 + interval 5 seconds, interval 5 seconds)")) \
.withColumn('time', explode('seq')) \
.select('id', 'time') \
.join(df, ['id', 'time'], 'left') \
.fillna(0).show(20, False)
+---+-----------------------+-----+
|id |time |Value|
+---+-----------------------+-----+
|id1|2020-02-22 04:57:36.843|1.4 |
|id1|2020-02-22 04:57:41.843|0.0 |
|id1|2020-02-22 04:57:46.843|0.0 |
|id1|2020-02-22 04:57:51.843|0.0 |
|id2|2020-02-22 04:57:50.85 |1.7 |
|id2|2020-02-22 04:57:55.85 |0.0 |
|id2|2020-02-22 04:58:00.85 |0.0 |
|id3|2020-02-22 04:57:59.133|1.2 |
|id3|2020-02-22 04:58:04.133|0.0 |
|id3|2020-02-22 04:58:09.133|0.0 |
|id3|2020-02-22 04:58:14.133|0.0 |
+---+-----------------------+-----+
添加回答
舉報