1 回答

TA貢獻1875條經驗 獲得超5個贊
正如在問題中提到的,該程序對于足夠小的數據集運行得很好(盡管它似乎繞過了 Ray 邏輯的幾個方面),但它最終在大型數據集上崩潰了。僅使用 Ray 任務,我沒有設法調用存儲在 Object Store ( ValueError: buffer source array is read-only) 中的 Scipy Interpolator 對象,并且裝飾所有函數沒有意義,因為實際上只有主要函數應該同時執行(同時調用其他函數)。
因此,我決定更改程序結構以使用 Ray Actors。設置說明現在是該__init__方法的一部分。特別是,Scipy Interpolator 對象在此方法中定義并設置為 的屬性self,就像全局變量一樣。大多數函數(包括 main 函數)已成為類方法,但通過 Numba 編譯的函數除外。對于后者,它們仍然是用 裝飾的獨立函數@jit,但它們中的每一個現在在調用 jitted 函數的類中都有一個等效的包裝方法。
為了讓我的程序并行執行我現在的主要方法,我依賴于 ActorPool。我創建了與可用 CPU 一樣多的 actor,每個 actor 都執行 main 方法,成功調用方法和 Numba 編譯的函數,同時還設法訪問 Interpolator 對象。我只適用@ray.remote于定義的 Python 類。所有這些都轉化為以下結構:
@ray.remote
class FooClass(object):
? ? def __init__(self, initArgs):
? ? ? ? # Initialisation
? ? @staticmethod
? ? def exampleStaticMethod(args):
? ? ? ? # Processing
? ? ? ? return
? ? def exampleMethod(self, args):
? ? ? ? # Processing
? ? ? ? return
? ? def exampleWrapperMethod(self, args):
? ? ? ? return numbaCompiledFunction(args)
? ? def mainMethod(self, poolMapArgs):
? ? ? ? # Processing
? ? ? ? return
@jit
def numbaCompiledFunction(args):
? ? # Processing
? ? return
ray.init(address='auto', redis_password=redPass)
actors = []
for actor in range(int(ray.cluster_resources()['CPU'])):
? ? actors.append(FooClass.remote(initArgs))
pool = ActorPool(actors)
for unpackedTuple in pool.map_unordered(
? ? ? ? lambda a, v: a.mainMethod.remote(v),
? ? ? ? poolMapArgs):
? ? # Processing
ray.shutdown()
這在分布在 4 個節點上的 192 個 CPU 上成功運行,沒有任何警告或錯誤。
添加回答
舉報