亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Dask - 是否可以通過自定義函數使用每個工作線程中的所有線程?

Dask - 是否可以通過自定義函數使用每個工作線程中的所有線程?

開心每一天1111 2023-06-27 18:11:17
就我而言,我在 S3 中有多個文件和一個自定義函數,該函數讀取每個文件并使用所有線程處理它。為了簡化示例,我只生成一個數據幀df,并且假設我的函數是tsfresh.extract_features使用多重處理。生成數據import pandas as pdfrom tsfresh import extract_featuresfrom tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \load_robot_execution_failuresdownload_robot_execution_failures()ts, y = load_robot_execution_failures()df = []for i in range(5):    tts = ts.copy()    tts["id"] += 88 * i    df.append(tts)    df = pd.concat(df, ignore_index=True)功能def fun(df, n_jobs):    extracted_features = extract_features(df,                                      column_id="id",                                      column_sort="time",                                      n_jobs=n_jobs)簇import daskfrom dask.distributed import Client, progressfrom dask import compute, delayedfrom dask_cloudprovider import FargateClustermy_vpc = # your vpcmy_subnets = # your subnetscpu = 2 ram = 4cluster = FargateCluster(n_workers=1,                         image='rpanai/feats-worker:2020-08-24',                         vpc=my_vpc,                         subnets=my_subnets,                         worker_cpu=int(cpu * 1024),                         worker_mem=int(ram * 1024),                         cloudwatch_logs_group="my_log_group",                         task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],                         scheduler_timeout='20 minutes'                        )cluster.adapt(minimum=1,              maximum=4)client = Client(cluster)client使用所有工作線程(失敗)to_process = [delayed(fun)(df, cpu) for i in range(10)]out = compute(to_process)AssertionError: daemonic processes are not allowed to have children僅使用一個線程(OK)在這種情況下,它工作正常,但我浪費資源。to_process = [delayed(fun)(df, 0) for i in range(10)]out = compute(to_process)問題我知道對于這個特定的功能,我最終可以使用多線程和其他一些技巧編寫一個自定義分配器,但我想分配一個工作,讓每個工作人員都可以利用所有資源,而不必擔心太多。
查看完整描述

1 回答

?
元芳怎么了

TA貢獻1798條經驗 獲得超7個贊

我可以幫助回答您的具體問題tsfresh,但 iftsfresh只是一個簡單的玩具示例,可能不是您想要的。

對于tsfresh,您通常不會混合使用tsfreshdask 和 dask 的多重處理,而是讓 dask 執行所有處理。這意味著,您從一個單一的開始dask.DataFrame(在您的測試用例中,您可以將 pandas 數據幀轉換為 dask 數據幀 - 對于您的讀取用例,您可以直接從S3?docu讀?。?,然后在 dask 數據幀中分發特征提?。ㄌ卣魈崛〉暮锰幨牵诿總€時間序列上獨立工作。因此我們可以為每個時間序列生成一個作業)。


我不確定這是否有助于解決您更普遍的問題。在我看來,你(在大多數情況下)不想混合dask的分布函數和“本地”多核計算,而只是讓dask處理一切。因為如果您位于 dask 集群上,您甚至可能不知道每臺機器上有多少個核心(或者每個作業可能只獲得一個核心)。

這意味著,如果您的作業可以分發 N 次,并且每個作業將啟動 M 個子作業,您只需將“N x M”作業交給 dask 并讓它計算其余部分(包括數據局部性)。


查看完整回答
反對 回復 2023-06-27
  • 1 回答
  • 0 關注
  • 139 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號