就我而言,我在 S3 中有多個文件和一個自定義函數,該函數讀取每個文件并使用所有線程處理它。為了簡化示例,我只生成一個數據幀df,并且假設我的函數是tsfresh.extract_features使用多重處理。生成數據import pandas as pdfrom tsfresh import extract_featuresfrom tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \load_robot_execution_failuresdownload_robot_execution_failures()ts, y = load_robot_execution_failures()df = []for i in range(5): tts = ts.copy() tts["id"] += 88 * i df.append(tts) df = pd.concat(df, ignore_index=True)功能def fun(df, n_jobs): extracted_features = extract_features(df, column_id="id", column_sort="time", n_jobs=n_jobs)簇import daskfrom dask.distributed import Client, progressfrom dask import compute, delayedfrom dask_cloudprovider import FargateClustermy_vpc = # your vpcmy_subnets = # your subnetscpu = 2 ram = 4cluster = FargateCluster(n_workers=1, image='rpanai/feats-worker:2020-08-24', vpc=my_vpc, subnets=my_subnets, worker_cpu=int(cpu * 1024), worker_mem=int(ram * 1024), cloudwatch_logs_group="my_log_group", task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'], scheduler_timeout='20 minutes' )cluster.adapt(minimum=1, maximum=4)client = Client(cluster)client使用所有工作線程(失敗)to_process = [delayed(fun)(df, cpu) for i in range(10)]out = compute(to_process)AssertionError: daemonic processes are not allowed to have children僅使用一個線程(OK)在這種情況下,它工作正常,但我浪費資源。to_process = [delayed(fun)(df, 0) for i in range(10)]out = compute(to_process)問題我知道對于這個特定的功能,我最終可以使用多線程和其他一些技巧編寫一個自定義分配器,但我想分配一個工作,讓每個工作人員都可以利用所有資源,而不必擔心太多。
1 回答

元芳怎么了
TA貢獻1798條經驗 獲得超7個贊
我可以幫助回答您的具體問題tsfresh
,但 iftsfresh
只是一個簡單的玩具示例,可能不是您想要的。
對于tsfresh
,您通常不會混合使用tsfresh
dask 和 dask 的多重處理,而是讓 dask 執行所有處理。這意味著,您從一個單一的開始dask.DataFrame
(在您的測試用例中,您可以將 pandas 數據幀轉換為 dask 數據幀 - 對于您的讀取用例,您可以直接從S3
?docu讀?。?,然后在 dask 數據幀中分發特征提?。ㄌ卣魈崛〉暮锰幨牵诿總€時間序列上獨立工作。因此我們可以為每個時間序列生成一個作業)。
我不確定這是否有助于解決您更普遍的問題。在我看來,你(在大多數情況下)不想混合dask的分布函數和“本地”多核計算,而只是讓dask處理一切。因為如果您位于 dask 集群上,您甚至可能不知道每臺機器上有多少個核心(或者每個作業可能只獲得一個核心)。
這意味著,如果您的作業可以分發 N 次,并且每個作業將啟動 M 個子作業,您只需將“N x M”作業交給 dask 并讓它計算其余部分(包括數據局部性)。
添加回答
舉報
0/150
提交
取消