我們正在試驗Apache Beam(使用Go SDK)和Dataflow來并行化我們耗時的任務之一。為了更多的上下文,我們有緩存作業,它接受一些查詢,跨數據庫運行它并緩存它們。每個數據庫查詢可能需要幾秒鐘到幾分鐘的時間,我們希望并行運行這些查詢以更快地完成任務。創建了一個簡單的管道,如下所示: // Create initial PCollection. startLoad := beam.Create(s, "InitialLoadToStartPipeline") // Emits a unit of work along with query and date range. cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad) // Emits a cache response which includes errCode, errMsg, time etc. cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads) ...排放的數量單位不是很多,在生產中將主要以數百個和最多幾千個為單位。getCachePayloadsFn現在的問題是沒有并行執行,查詢是逐個按順序執行的。我們通過在緩存函數中輸入日志并記錄goroutine id,進程ID,開始和結束時間等來確認這一點,以確認執行中沒有重疊。cacheQueryDoFnStartBundleProcessElement我們希望始終并行運行查詢,即使只有 10 個查詢。根據我們的理解和文檔,它從整體輸入創建捆綁包,這些捆綁包并行運行,并且在捆綁包中按順序運行。有沒有辦法控制來自負載的捆綁包的數量,或者有沒有辦法增加并行度?我們嘗試過的事情:保留 和 。它啟動兩個 VM,但運行方法以僅在一個 VM 上初始化 DoFn,并將其用于整個負載。num_workers=2autoscaling_algorithm=NoneSetup在此處找到選項。但不知道如何正確設置它。已嘗試使用 設置它。無效果。sdk_worker_parallelismbeam.PipelineOptions.Set("sdk_worker_parallelism", "50")
1 回答

富國滬深
TA貢獻1790條經驗 獲得超9個贊
默認情況下,Create 不是并行的,并且所有 DoFn 都融合到與 Create 相同的階段中,因此它們也沒有并行性。有關此內容的詳細信息,請參閱 https://beam.apache.org/documentation/runtime/model/#dependent-parallellism。
您可以使用重新洗牌變換顯式強制融合中斷。
添加回答
舉報
0/150
提交
取消