亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用 Ray 并行化大型程序的正確方法

使用 Ray 并行化大型程序的正確方法

四季花海 2023-06-13 16:08:12
我有一個相當大的 Python 程序(~800 行),它具有以下結構:設置說明,我在其中處理用戶提供的輸入文件并定義對程序執行具有全局性的變量/對象。主要功能,它利用前面的設置階段并調用程序的主要附加功能。附加函數可以是主要的,因為它們被主函數直接調用,或者是次要的,因為它們只被主要的附加函數調用。我處理 main 函數結果的最后幾行代碼。該程序是大規模并行的,因為主函數的每次執行都獨立于前一個和下一個。因此,我使用 Ray 在集群中的多個工作節點上并行執行主要功能。操作系統是 CentOS Linux release 8.2.2004 (Core),集群執行 PBS Pro 19.2.4.20190830141245。我正在使用 Python 3.7.4、Ray 0.8.7 和 Redis 3.4.1。我在 Python 腳本中有以下內容,foo主要功能在哪里:@ray.remote(memory=2.5 * 1024 * 1024 * 1024)def foo(locInd):    # Main functionif __name__ == '__main__':    ray.init(address='auto', redis_password=args.pw,             driver_object_store_memory=10 * 1024 * 1024 * 1024)    futures = [foo.remote(i) for i in zip(*np.asarray(indArr == 0).nonzero())]    waitingIds = list(futures)    while len(waitingIds) > 0:        readyIds, waitingIds = ray.wait(            waitingIds, num_returns=min([checkpoint, len(waitingIds)]))        for r0, r1, r2, r3, r4, r5, r6, r7 in ray.get(readyIds):            # Process results            indArr[r0[::-1]] = 1            nodesComplete += 1    ray.shutdown()以下是我用來啟動 Ray 的說明# Head node/path/to/ray start --head --port=6379 \--redis-password=$redis_password \--memory $((120 * 1024 * 1024 * 1024)) \--object-store-memory $((20 * 1024 * 1024 * 1024)) \--redis-max-memory $((10 * 1024 * 1024 * 1024)) \--num-cpus 48 --num-gpus 0只要我處理足夠小的數據集,一切都會按預期運行。盡管如此,執行會產生以下警告2020-08-17 17:16:44,289 警告 worker.py:1134 -- 警告:腌制時遠程函數的__main__.foo大小為 220019409。它將存儲在 Redis 中,這可能會導致內存問題。這可能意味著它的定義使用了一個大數組或其他對象。2020-08-17 17:17:10,281 WARNING worker.py:1134 -- 這個 worker 被要求執行一個它沒有注冊的函數。您可能需要重新啟動 Ray。關于我如何向 Ray 描述程序,我顯然做錯了什么。我有 Scipy Interpolator 對象,我認為它們是全局的,但是,正如在這個 GitHub線程中已經指出的那樣,我應該調用ray.put它們。問題是我遇到了這些ValueError: buffer source array is read-only我不知道如何診斷的問題。另外,我不確定是否應該用主要功能裝飾所有功能@ray.remote或只裝飾主要功能。我想我可以@ray.remote(num_cpus=1)為所有附加功能做,因為它實際上只應該是并行執行的主要功能,但我不知道這是否有意義。
查看完整描述

1 回答

?
慕田峪4524236

TA貢獻1875條經驗 獲得超5個贊

正如在問題中提到的,該程序對于足夠小的數據集運行得很好(盡管它似乎繞過了 Ray 邏輯的幾個方面),但它最終在大型數據集上崩潰了。僅使用 Ray 任務,我沒有設法調用存儲在 Object Store ( ValueError: buffer source array is read-only) 中的 Scipy Interpolator 對象,并且裝飾所有函數沒有意義,因為實際上只有主要函數應該同時執行(同時調用其他函數)。


因此,我決定更改程序結構以使用 Ray Actors。設置說明現在是該__init__方法的一部分。特別是,Scipy Interpolator 對象在此方法中定義并設置為 的屬性self,就像全局變量一樣。大多數函數(包括 main 函數)已成為類方法,但通過 Numba 編譯的函數除外。對于后者,它們仍然是用 裝飾的獨立函數@jit,但它們中的每一個現在在調用 jitted 函數的類中都有一個等效的包裝方法。


為了讓我的程序并行執行我現在的主要方法,我依賴于 ActorPool。我創建了與可用 CPU 一樣多的 actor,每個 actor 都執行 main 方法,成功調用方法和 Numba 編譯的函數,同時還設法訪問 Interpolator 對象。我只適用@ray.remote于定義的 Python 類。所有這些都轉化為以下結構:


@ray.remote

class FooClass(object):

? ? def __init__(self, initArgs):

? ? ? ? # Initialisation


? ? @staticmethod

? ? def exampleStaticMethod(args):

? ? ? ? # Processing

? ? ? ? return


? ? def exampleMethod(self, args):

? ? ? ? # Processing

? ? ? ? return


? ? def exampleWrapperMethod(self, args):

? ? ? ? return numbaCompiledFunction(args)


? ? def mainMethod(self, poolMapArgs):

? ? ? ? # Processing

? ? ? ? return



@jit

def numbaCompiledFunction(args):

? ? # Processing

? ? return



ray.init(address='auto', redis_password=redPass)

actors = []

for actor in range(int(ray.cluster_resources()['CPU'])):

? ? actors.append(FooClass.remote(initArgs))

pool = ActorPool(actors)

for unpackedTuple in pool.map_unordered(

? ? ? ? lambda a, v: a.mainMethod.remote(v),

? ? ? ? poolMapArgs):

? ? # Processing

ray.shutdown()

這在分布在 4 個節點上的 192 個 CPU 上成功運行,沒有任何警告或錯誤。


查看完整回答
反對 回復 2023-06-13
  • 1 回答
  • 0 關注
  • 215 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號