2 回答

TA貢獻1786條經驗 獲得超13個贊
這是一個替代方案,而不是對整個數據幀進行上采樣,您可以首先檢查兩行之間的時間差異大于間隙的位置。然后將 3s 刪除到具有這些特定新時間戳的聯合的間隙和reindexdf 的行。創建這些行后,您可以groupby使用添加新索引的位置,resample每組 1 秒,最后rolling使用您所做的方法。Reindex最后有df。
rule = 3
rolling_win = f'{rule}s'
sparse = df.index.to_series().diff().dt.total_seconds().ge(rule)
new_timestamps = df.index[sparse] - pd.Timedelta(seconds=rule)
print(new_timestamps)
#DatetimeIndex(['2013-01-01 09:00:07'], dtype='datetime64[ns]', freq=None)
#reindex with the new
df_ = df.reindex(df.index.union(new_timestamps))
#perform first the resample 1s per group, then clean the dataframe to do the rolling.mean
#finally reindex like original df
df_ = (df_.groupby(df_.index.isin(new_timestamps).cumsum())
.resample("1s").ffill()
.reset_index(level=0, drop=True).ffill()
.rolling(rolling_win, closed="left", min_periods=rule)\
.mean()
.reindex(df.index)
)
print(df_)
A
2013-01-01 09:00:00 NaN
2013-01-01 09:00:02 NaN
2013-01-01 09:00:03 0.333333
2013-01-01 09:00:05 1.666667
2013-01-01 09:00:06 2.333333
2013-01-01 09:00:10 4.000000
在這種情況下,它并不是很有趣,因為差距實際上很小,但如果差距很大,那么它就會變得有用。
編輯或其他選項,可能更好,union所有索引都是從您刪除 1s、2s、3s 的原始索引中創建的,...(取決于規則)?,F在你只有滾動 so和reindex所需的索引。最后結果一樣ffillrolling.mean
from functools import reduce
rule = 3
rolling_win = f'{rule}s'
idx = df.index
df_ = (df.reindex(reduce(lambda x, y: x.union(y),
[idx - pd.Timedelta(seconds=i)
for i in range(0, rule+1)]))
.ffill()
.rolling(rolling_win, closed="left", min_periods=rule)\
.mean()
.reindex(df.index)
)

TA貢獻1934條經驗 獲得超2個贊
啟發的兩種可能的解決方案:
def time_weighted_average_using_local_upsampling(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
"""Uses second resolution up-sampling only on smaller windows at a time."""
original_index = df.index.copy()
avg = (
df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
.rolling(avg_window, closed="both", min_periods=2)
.apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False))
.reindex(original_index)
)
return avg
def time_weighted_average_using_index_weighting(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
"""Uses weighting by duration, by ensuring every window has a point at the start."""
original_index = df.index.copy()
avg = (
df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
.rolling(avg_window, closed="both", min_periods=2)
.apply(lambda x: np.average(x[:-1], weights=x.index.to_series().diff()[1:].dt.seconds))
.reindex(original_index)
)
return avg
第一個一次對單個滾動窗口進行上采樣,而后者實際上通過確保在我們關心的窗口開始處始終有一個可用點來進行參差不齊的時間加權平均。這是通過包括按窗口長度移動的原始索引來完成的。
我還沒有衡量相關案例的表現。
編輯: 我決定在大約 100,000 行的第二個分辨率數據集上測試函數,并使用 20 分鐘的窗口(?。﹥煞N變體都慢得令人難以忍受,但我認為我有一個新的贏家:
def time_weighted_average_using_index_weighting2(df: pd.DataFrame, avg_window: str) -> pd.DataFrame:
"""Uses weighting by duration, by ensuring every window has a point at the start."""
original_index = df.index.copy()
avg = df.reindex(df.index.union(df.index.shift(periods=-1, freq=avg_window)), method="ffill")
avg = (
avg.multiply(avg.index.to_series().diff().dt.seconds.shift(-1), axis=0)
.divide(pd.Timedelta(avg_window).seconds)
.rolling(avg_window, closed="left")
.sum()
.reindex(original_index)
)
avg[~((avg.index - pd.Timedelta(avg_window)) >= original_index[0])] = np.nan
return avg
這個在滾動之前預先加權,因此我們不用使用.sum()而不是apply(). 這轉化為巨大的速度提升。無論平均窗口的大小如何,我們最多也可以將索引加倍。
添加回答
舉報