2 回答

TA貢獻1805條經驗 獲得超10個贊
您的基本算法是“我希望將 的前 10 個值df['Node']設置為 的第一個值ndf,將接下來的 10 個值設置為 的下一個值ndf,依此類推”。這在 Dask 中很難,因為它不知道每個分區中有多少行:您正在從 CSV 讀取,并且您在 X 字節中獲得的行數取決于每個部分中的數據是什么樣的. 其他格式為您提供更多信息...
因此,您肯定需要兩次遍歷數據。您可以使用索引來找出劃分并可能進行一些排序。在我看來,你能做的最簡單的事情就是測量分割長度,然后得到每個開始的偏移量:
lengths = df.map_partitions(len).compute()
offsets = np.cumsum(lengths.values)
offsets -= offsets[0]
現在使用自定義延遲功能來處理零件
@dask.delayed
def add_node(part, offset, ndf):
index = pd.Series(range(offset, offset + len(part)) // 10,
index=part.index) # 10 is the repeat factor
part['Node'] = index.map(ndf)
return part
df2 = dd.from_delayed([add_node(d, off, ndf)
for d, off in zip(df.to_delayed(), offsets)])

TA貢獻1830條經驗 獲得超3個贊
使用相同的工作流程,您可以divisions按照此處的建議手動設置
import dask.dataframe as dd
import pandas as pd
import numpy as np
pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv", index=False)
df = dd.read_csv("tempfile.csv")
ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))
df.divisions = (0, len(df)-1)
df["Note"] = dd.from_array(np.repeat(ndf.values, 10))
我不認為使用np.repeat是非常有效的,特別是對于大 df。
添加回答
舉報