我有一個使用 Dask 編寫的大型數據提取作業,其中每個任務將從數十個數據庫的大量表中查詢一個表。對于每個數據庫實例,我想限制一次連接的任務數量(即限制)。例如,我可能有 100 個任務連接到數據庫 A,100 個任務連接到數據庫 B,100 個任務連接到數據庫 C,等等,并且我想確保在任何給定時間連接到任何數據庫的任務不超過 20 個。我發現 Dask 提供了基于工作線程資源(CPU、MEM、GPU 等)的約束,但是數據庫資源是“全局”的,因此對于任何 Dask 工作線程來說都不是特定的。Dask 是否提供任何方法來對任務并發性的此類約束進行建模?
1 回答

幕布斯6054654
TA貢獻1876條經驗 獲得超7個贊
Dask 提供分布式信號量,可以限制對數據庫等資源的并發訪問。
例子
import time
from dask.distributed import Client, Semaphore
client = Client(...)
def do_task(x, sem):
? ? with sem:
? ? ? ? time.sleep(5)
? ? ? ? return x
# allow no more than 5 tasks to run concurrently
sem = Semaphore(max_leases=5, name="Limiter")
# submit jobs that use the semaphore
futures = client.map(do_task, range(20), sem=sem)
# collect results
results = client.gather(futures)
添加回答
舉報
0/150
提交
取消