亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何并行化遞歸函數

如何并行化遞歸函數

Go
MMMHUHU 2022-07-25 11:19:38
我正在嘗試并行化 Go 中的遞歸問題,但我不確定最好的方法是什么。我有一個遞歸函數,它的工作原理是這樣的:func recFunc(input string) (result []string) {    for subInput := range getSubInputs(input) {        subOutput := recFunc(subInput)        result = result.append(result, subOutput...)    }    result = result.append(result, getOutput(input)...)}func main() {    output := recFunc("some_input")    ...}因此,函數調用自身N時間(其中 N 在某個級別為 0),生成自己的輸出并返回列表中的所有內容?,F在我想讓這個函數并行運行。但我不確定最干凈的方法是什么。我的想法:有一個“結果”通道,所有函數調用都將結果發送到該通道。在 main 函數中收集結果。有一個等待組,它決定何時收集所有結果。問題:我需要等待等待組并并行收集所有結果。我可以為此啟動一個單獨的 go 函數,但是我如何退出這個單獨的 go 函數?func recFunc(input string) (result []string, outputChannel chan []string, waitGroup &sync.WaitGroup) {    defer waitGroup.Done()    waitGroup.Add(len(getSubInputs(input))    for subInput := range getSubInputs(input) {        go recFunc(subInput)    }    outputChannel <-getOutput(input)}func main() {    outputChannel := make(chan []string)    waitGroup := sync.WaitGroup{}    waitGroup.Add(1)    go recFunc("some_input", outputChannel, &waitGroup)    result := []string{}    go func() {       nextResult := <- outputChannel       result = append(result, nextResult ...)    }    waitGroup.Wait()}也許有更好的方法來做到這一點?或者我怎樣才能確保收集結果的匿名 go 函數在完成后被關閉?
查看完整描述

2 回答

?
瀟瀟雨雨

TA貢獻1833條經驗 獲得超4個贊

  • 遞歸算法應該對昂貴的資源(網絡連接、goroutine、堆棧空間等)有有限的限制。

  • 應該支持取消 - 以確保在不再需要結果時可以快速清理昂貴的操作

  • 分支遍歷應該支持錯誤報告;這允許錯誤在堆棧中冒泡并返回部分結果,而不會導致整個遞歸遍歷失敗。


對于異步結果 - 無論是否使用遞歸 - 建議使用通道。此外,對于具有許多 goroutine 的長時間運行的作業,提供一種取消方法(context.Context)以幫助清理。

由于遞歸會導致資源的指數消耗,因此設置限制很重要(請參閱有限并行)。

下面是我經常用于異步任務的設計模式:

  • 始終支持使用context.Context進行取消

  • 任務所需的工人數量

  • 返回 a chanof results & a chan error(只會返回一個錯誤或nil)

var (

    workers = 10

    ctx     = context.TODO() // use request context here - otherwise context.Background()

    input   = "abc"

)


resultC, errC := recJob(ctx, workers, input) // returns results & `error` channels


// asynchronous results - so read that channel first in the event of partial results ...

for r := range resultC {

    fmt.Println(r)

}


// ... then check for any errors

if err := <-errC; err != nil {

    log.Fatal(err)

}

遞歸:

由于遞歸快速水平擴展,因此需要一種一致的方法來填充有限的工人列表,同時確保工人被釋放時,他們可以迅速從其他(過度工作的)工人那里接手工作。

與其創建一個管理者層,不如使用一個協作的工作人員系統:

  • 每個工人共享一個輸入通道

  • 在遞歸輸入之前(subIinputs)檢查是否有任何其他工作人員處于空閑狀態

    • 如果是這樣,委托給那個工人

    • 如果不是,則當前工作人員繼續遞歸該分支

使用此算法,有限數量的工人很快就會因工作而飽和。任何提前完成其分支機構的工人 - 將很快被另一名工人委派一個子分支機構。最終所有的worker都將用完子分支,此時所有的worker都將被閑置(阻塞)并且遞歸任務可以完成。

為了實現這一點,需要進行一些仔細的協調。允許工作人員寫入輸入通道有助于通過委托進行這種對等協調?!斑f歸深度”WaitGroup用于跟蹤所有工作人員何時用盡所有分支。

(包括上下文支持和錯誤鏈接 - 我更新了您的getSubInputs函數以獲取ctx并返回可選error):

func recFunc(ctx context.Context, input string, in chan string, out chan<- string, rwg *sync.WaitGroup) error {


    defer rwg.Done() // decrement recursion count when a depth of recursion has completed


    subInputs, err := getSubInputs(ctx, input)

    if err != nil {

        return err

    }


    for subInput := range subInputs { 

        rwg.Add(1) // about to recurse (or delegate recursion)


        select {

        case in <- subInput:

            // delegated - to another goroutine


        case <-ctx.Done():

            // context canceled...


            // but first we need to undo the earlier `rwg.Add(1)`

            // as this work item was never delegated or handled by this worker

            rwg.Done()

            return ctx.Err()


        default:

            // noone available to delegate - so this worker will need to recurse this item themselves

            err = recFunc(ctx, subInput, in, out, rwg)

            if err != nil {

                return err

            }

        }


        select {

        case <-ctx.Done():

            // always check context when doing anything potentially blocking (in this case writing to `out`)

            // context canceled

            return ctx.Err()


        case out <- subInput:

        }

    }


    return nil

}

連接碎片:

recJob創建:

  • 輸入和輸出通道 - 由所有工人共享

  • “遞歸”WaitGroup檢測所有工作人員何時空閑

    • 然后可以安全地關閉“輸出”通道

  • 所有工作人員的錯誤通道

  • 通過將初始輸入寫入輸入通道來啟動遞歸工作負載

func recJob(ctx context.Context, workers int, input string) (resultsC <-chan string, errC <-chan error) {


    // RW channels

    out := make(chan string)

    eC := make(chan error, 1)


    // R-only channels returned to caller

    resultsC, errC = out, eC


    // create workers + waitgroup logic

    go func() {


        var err error // error that will be returned to call via error channel


        defer func() {

            close(out)

            eC <- err

            close(eC)

        }()


        var wg sync.WaitGroup

        wg.Add(1)

        in := make(chan string) // input channel: shared by all workers (to read from and also to write to when they need to delegate)


        workerErrC := createWorkers(ctx, workers, in, out, &wg)


        // get the ball rolling, pass input job to one of the workers

        // Note: must be done *after* workers are created - otherwise deadlock

        in <- input


        errCount := 0


        // wait for all worker error codes to return

        for err2 := range workerErrC {

            if err2 != nil {

                log.Println("worker error:", err2)

                errCount++

            }

        }


        // all workers have completed

        if errCount > 0 {

            err = fmt.Errorf("PARTIAL RESULT: %d of %d workers encountered errors", errCount, workers)

            return

        }


        log.Printf("All %d workers have FINISHED\n", workers)

    }()


    return

}

最后,創建工人:


func createWorkers(ctx context.Context, workers int, in chan string, out chan<- string, rwg *sync.WaitGroup) (errC <-chan error) {


    eC := make(chan error) // RW-version

    errC = eC              // RO-version (returned to caller)


    // track the completeness of the workers - so we know when to wrap up

    var wg sync.WaitGroup

    wg.Add(workers)


    for i := 0; i < workers; i++ {

        i := i

        go func() {

            defer wg.Done()


            var err error


            // ensure the current worker's return code gets returned

            // via the common workers' error-channel

            defer func() {

                if err != nil {

                    log.Printf("worker #%3d ERRORED: %s\n", i+1, err)

                } else {

                    log.Printf("worker #%3d FINISHED.\n", i+1)

                }

                eC <- err

            }()


            log.Printf("worker #%3d STARTED successfully\n", i+1)


            // worker scans for input

            for input := range in {


                err = recFunc(ctx, input, in, out, rwg)

                if err != nil {

                    log.Printf("worker #%3d recurseManagers ERROR: %s\n", i+1, err)

                    return

                }

            }


        }()

    }


    go func() {

        rwg.Wait() // wait for all recursion to finish

        close(in)  // safe to close input channel as all workers are blocked (i.e. no new inputs)

        wg.Wait()  // now wait for all workers to return

        close(eC)  // finally, signal to caller we're truly done by closing workers' error-channel

    }()


    return

}


查看完整回答
反對 回復 2022-07-25
?
慕哥6287543

TA貢獻1831條經驗 獲得超10個贊

我可以為此啟動一個單獨的 go 函數,但是我如何退出這個單獨的 go 函數?


您可以range在單獨的 go-routine 中通過輸出通道。在這種情況下,當通道關閉時,go-routine 將安全退出


go func() {

   for nextResult := range outputChannel {

     result = append(result, nextResult ...)

   }

}

所以,現在我們需要注意的是,在作為遞歸函數調用的一部分生成的所有 go-routines 成功存在之后,通道被關閉


為此,您可以在所有 go-routines 中使用共享的等待組,并在您的主函數中等待該等待組,就像您已經在做的那樣。一旦等待結束,關閉 outputChannel,讓其他 go-routine 也安全退出


func recFunc(input string, outputChannel chan, wg &sync.WaitGroup) {

    defer wg.Done()

    for subInput := range getSubInputs(input) {

        wg.Add(1)

        go recFunc(subInput)

    }

    outputChannel <-getOutput(input)

}


func main() {

    outputChannel := make(chan []string)

    waitGroup := sync.WaitGroup{}


    waitGroup.Add(1)

    go recFunc("some_input", outputChannel, &waitGroup)


    result := []string{}

    go func() {

     for nextResult := range outputChannel {

      result = append(result, nextResult ...)

     }

    }

    waitGroup.Wait()

    close(outputChannel)        

}

PS:如果你想有有限的并行性來限制指數增長,看看這個


查看完整回答
反對 回復 2022-07-25
  • 2 回答
  • 0 關注
  • 134 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號