1 回答

TA貢獻1812條經驗 獲得超5個贊
import pandas as pd
df1=pd.DataFrame({'name' : ['A']*4+['B']*4,
'start_date': pd.to_datetime(['2000-03-15', '2000-06-12','2000-09-01', '2001-01-17','2000-03-19', '2000-06-14','2000-09-14', '2001-01-22']),
'end_date':pd.to_datetime(['2000-06-12','2000-09-01', '2001-01-17','2001-03-19', '2000-06-14','2000-09-14', '2001-01-22','2001-02-01'])})
date=pd.date_range('2000-01-01','2002-01-01')
name=['A']*len(date)+['B']*len(date)
date=date.append(date)
import numpy as np
low=np.random.rand(len(date))
high=low+np.random.rand(len(date))
df2=pd.DataFrame({'name': name, 'date': date, 'low':low,'high':high})
df2 = df2.set_index('date')
def find_max(row):
return df2[df2['name'] == row['name']].loc[row['start_date']:row['end_date'], 'high'].max()
def find_min(row):
return df2[df2['name'] == row['name']].loc[row['start_date']:row['end_date'], 'low'].min()
df1['maximum'] = df1.apply(find_max, axis=1)
df1['minimum'] = df1.apply(find_min, axis=1)
嘗試一次找到最小值和最大值。它可能會節省一些時間。
def find_min_max(row):
dfx = df2[df2['name'] == row['name']].loc[row['start_date']:row['end_date'], ['high', 'low']]
maximum = dfx['high'].max()
minimum = dfx['low'].min()
return pd.Series({'maximum': maximum, 'minimum': minimum})
df1.merge(df1.apply(find_min_max, axis=1), left_index=True, right_index=True)
試試這個:多處理和共享內存。將其保存在 .py 文件中并使用命令行運行它。它應該快得多。我將 n_workers 設置為 4。您可以更改它。
import numpy as np
import pandas as pd
from multiprocessing.shared_memory import SharedMemory
from concurrent.futures import ProcessPoolExecutor, as_completed
def find_min_max(name, data_info):
shm_name, shape, dtype = data_info[0]
shm1 = SharedMemory(shm_name)
np1 = np.recarray(shape=shape, dtype=dtype, buf=shm1.buf)
shm_name, shape, dtype = data_info[1]
shm2 = SharedMemory(shm_name)
np2 = np.recarray(shape=shape, dtype=dtype, buf=shm2.buf)
data1 = np1[np1['name'] == name]
data2 = np2[np2['name'] == name]
for rec in data1:
idx1 = np.searchsorted(data2['date'], rec['start_date'])
idx2 = np.searchsorted(data2['date'], rec['end_date'])
data = data2[idx1:idx2]
np1[rec['index']]['maximum'] = data['high'].max()
np1[rec['index']]['minimum'] = data['low'].min()
def main():
np.random.seed(12345)
df1 = pd.DataFrame({'name': ['A']*4+['B']*4,
'start_date': pd.to_datetime(['2000-03-15', '2000-06-12', '2000-09-01', '2001-01-17', '2000-03-19', '2000-06-14', '2000-09-14', '2001-01-22']),
'end_date': pd.to_datetime(['2000-06-12', '2000-09-01', '2001-01-17', '2001-03-19', '2000-06-14', '2000-09-14', '2001-01-22', '2001-02-01'])})
date = pd.date_range('2000-01-01', '2002-01-01')
name = ['A']*len(date)+['B']*len(date)
date = date.append(date)
low = np.random.rand(len(date))
high = low+np.random.rand(len(date))
df2 = pd.DataFrame({'name': name, 'date': date, 'low': low, 'high': high})
df1 = df1.sort_values('name')
df2 = df2.sort_values(['name', 'date'])
df1['maximum'] = -1.0
df1['minimum'] = -1.0
np1 = df1.to_records(column_dtypes={
'name': '|S20', 'start_date': '<M8[ns]', 'end_date': '<M8[ns]'})
np2 = df2.to_records(column_dtypes={
'name': '|S20', 'date': '<M8[ns]', 'low': '<f8', 'high': '<f8'})
names = [str.encode(name) for name in df1['name'].unique()]
del df1
del df2
shm1 = SharedMemory(name='d1', create=True, size=np1.nbytes)
shm2 = SharedMemory(name='d2', create=True, size=np2.nbytes)
shm1_np_array = np.recarray(
shape=np1.shape, dtype=np1.dtype, buf=shm1.buf)
np.copyto(shm1_np_array, np1)
shm2_np_array = np.recarray(
shape=np2.shape, dtype=np2.dtype, buf=shm2.buf)
np.copyto(shm2_np_array, np2)
data_info = [
(shm1.name, np1.shape, np1.dtype),
(shm2.name, np2.shape, np2.dtype)
]
del np1
del np2
# Set number of workers
n_workers = 4
with ProcessPoolExecutor(n_workers) as exe:
fs = [exe.submit(find_min_max, name, data_info)
for name in names]
for _ in as_completed(fs):
pass
print(shm1_np_array)
shm1.close()
shm2.close()
shm1.unlink()
shm2.unlink()
if __name__ == "__main__":
main()
添加回答
舉報