亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

熊貓:參差不齊的時間序列的時間加權滾動平均值

熊貓:參差不齊的時間序列的時間加權滾動平均值

aluckdog 2023-06-06 15:10:20
我有一個參差不齊(意思是不規則頻率)的時間索引 DataFrame,我想對其執行時間加權滾動平均值,以保持 DataFrame 的原始索引。假設記錄的值在被另一個值取代之前是有效的。實現這一點的一種方法是將參差不齊的 DataFrame 上采樣到統一頻率,然后進行滾動平均:import pandas as pdimport numpy as npdef time_weighted_average_using_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:    # Leads to high memory usage    original_index = df.index.copy()    avg = (        df.resample("1s")        .ffill()        .rolling(avg_window, closed="left", min_periods=int(avg_window[0])))        .mean()        .reindex(original_index)    )    return avgif __name__ == "__main__":    df = pd.DataFrame(        {"A": [0, 1, 2, 3, 4, 5]},        index=[            pd.Timestamp("20130101 09:00:00"),            pd.Timestamp("20130101 09:00:02"),            pd.Timestamp("20130101 09:00:03"),            pd.Timestamp("20130101 09:00:05"),            pd.Timestamp("20130101 09:00:06"),            pd.Timestamp("20130101 09:00:10"),        ],    )    expected_avg = pd.DataFrame(        {"A": [np.nan, np.nan, 1 / 3, 5 / 3, 7 / 3, 4]},        index=[            pd.Timestamp("20130101 09:00:00"),            pd.Timestamp("20130101 09:00:02"),            pd.Timestamp("20130101 09:00:03"),            pd.Timestamp("20130101 09:00:05"),            pd.Timestamp("20130101 09:00:06"),            pd.Timestamp("20130101 09:00:10"),        ],    )    pd.testing.assert_frame_equal(        time_weighted_average_using_upsampling(df=df, avg_window="3s"), expected_avg    )這個問題是上采樣破壞了參差不齊的 df 提供的稀疏表示的目的。稀疏表示具有內存效率,而上采樣版本則不然。這就引出了一個問題:如何在不必對整個 df 進行上采樣的情況下獲得上面顯示的結果?
查看完整描述

2 回答

?
開滿天機

TA貢獻1786條經驗 獲得超13個贊

這是一個替代方案,而不是對整個數據幀進行上采樣,您可以首先檢查兩行之間的時間差異大于間隙的位置。然后將 3s 刪除到具有這些特定新時間戳的聯合的間隙和reindexdf 的行。創建這些行后,您可以groupby使用添加新索引的位置,resample每組 1 秒,最后rolling使用您所做的方法。Reindex最后有df。


rule = 3

rolling_win = f'{rule}s'


sparse = df.index.to_series().diff().dt.total_seconds().ge(rule)

new_timestamps = df.index[sparse] - pd.Timedelta(seconds=rule)

print(new_timestamps) 

#DatetimeIndex(['2013-01-01 09:00:07'], dtype='datetime64[ns]', freq=None)


#reindex with the new 

df_ = df.reindex(df.index.union(new_timestamps))


#perform first the resample 1s per group, then clean the dataframe to do the rolling.mean

#finally reindex like original df

df_ = (df_.groupby(df_.index.isin(new_timestamps).cumsum())

          .resample("1s").ffill()

          .reset_index(level=0, drop=True).ffill()

          .rolling(rolling_win, closed="left", min_periods=rule)\

          .mean()

          .reindex(df.index)

      )

print(df_)

                            A

2013-01-01 09:00:00       NaN

2013-01-01 09:00:02       NaN

2013-01-01 09:00:03  0.333333

2013-01-01 09:00:05  1.666667

2013-01-01 09:00:06  2.333333

2013-01-01 09:00:10  4.000000

在這種情況下,它并不是很有趣,因為差距實際上很小,但如果差距很大,那么它就會變得有用。


編輯或其他選項,可能更好,union所有索引都是從您刪除 1s、2s、3s 的原始索引中創建的,...(取決于規則)?,F在你只有滾動 so和reindex所需的索引。最后結果一樣ffillrolling.mean


from functools import reduce


rule = 3

rolling_win = f'{rule}s'


idx = df.index

df_ = (df.reindex(reduce(lambda x, y: x.union(y), 

                         [idx - pd.Timedelta(seconds=i) 

                          for i in range(0, rule+1)]))

         .ffill()

         .rolling(rolling_win, closed="left", min_periods=rule)\

         .mean()

         .reindex(df.index)

        )


查看完整回答
反對 回復 2023-06-06
?
撒科打諢

TA貢獻1934條經驗 獲得超2個贊

啟發的兩種可能的解決方案:


def time_weighted_average_using_local_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:

    """Uses second resolution up-sampling only on smaller windows at a time."""

    original_index = df.index.copy()

    avg = (

        df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")

        .rolling(avg_window, closed="both", min_periods=2)

        .apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False))

        .reindex(original_index)

    )

    return avg



def time_weighted_average_using_index_weighting(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:

    """Uses weighting by duration, by ensuring every window has a point at the start."""

    original_index = df.index.copy()

    avg = (

        df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")

        .rolling(avg_window, closed="both", min_periods=2)

        .apply(lambda x: np.average(x[:-1], weights=x.index.to_series().diff()[1:].dt.seconds))

        .reindex(original_index)

    )

    return avg

第一個一次對單個滾動窗口進行上采樣,而后者實際上通過確保在我們關心的窗口開始處始終有一個可用點來進行參差不齊的時間加權平均。這是通過包括按窗口長度移動的原始索引來完成的。


我還沒有衡量相關案例的表現。


編輯: 我決定在大約 100,000 行的第二個分辨率數據集上測試函數,并使用 20 分鐘的窗口(?。﹥煞N變體都慢得令人難以忍受,但我認為我有一個新的贏家:


def time_weighted_average_using_index_weighting2(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:

    """Uses weighting by duration, by ensuring every window has a point at the start."""

    original_index = df.index.copy()

    avg = df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")

    avg = (

        avg.multiply(avg.index.to_series().diff().dt.seconds.shift(-1), axis=0)

        .divide(pd.Timedelta(avg_window).seconds)

        .rolling(avg_window, closed="left")

        .sum()

        .reindex(original_index)

    )

    avg[~((avg.index - pd.Timedelta(avg_window)) >= original_index[0])] = np.nan

    return avg

這個在滾動之前預先加權,因此我們不用使用.sum()而不是apply(). 這轉化為巨大的速度提升。無論平均窗口的大小如何,我們最多也可以將索引加倍。


查看完整回答
反對 回復 2023-06-06
  • 2 回答
  • 0 關注
  • 163 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號