亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何并行化遞歸函數

如何并行化遞歸函數

Go
慕妹3146593 2022-08-01 11:00:39
我試圖在 Go 中并行化遞歸問題,我不確定最好的方法是什么。我有一個遞歸函數,它的工作原理如下:func recFunc(input string) (result []string) {    for subInput := range getSubInputs(input) {        subOutput := recFunc(subInput)        result = result.append(result, subOutput...)    }    result = result.append(result, getOutput(input)...)}func main() {    output := recFunc("some_input")    ...}因此,該函數調用自身時間(其中N在某個級別為0),生成自己的輸出并返回列表中的所有內容。N現在我想讓這個函數并行運行。但我不確定最干凈的方法來做到這一點。我的想法:有一個“結果”通道,所有函數調用都向該通道發送其結果。在 main 函數中收集結果。有一個等待組,用于確定何時收集所有結果。問題:我需要等待等待組并并行收集所有結果。我可以為此啟動一個單獨的 go 函數,但是我該如何退出這個單獨的 go 函數呢?func recFunc(input string) (result []string, outputChannel chan []string, waitGroup &sync.WaitGroup) {    defer waitGroup.Done()    waitGroup.Add(len(getSubInputs(input))    for subInput := range getSubInputs(input) {        go recFunc(subInput)    }    outputChannel <-getOutput(input)}func main() {    outputChannel := make(chan []string)    waitGroup := sync.WaitGroup{}    waitGroup.Add(1)    go recFunc("some_input", outputChannel, &waitGroup)    result := []string{}    go func() {       nextResult := <- outputChannel       result = append(result, nextResult ...)    }    waitGroup.Wait()}也許有更好的方法來做到這一點?或者,我如何確保收集結果的匿名 go 函數在完成時被截斷?
查看完整描述

2 回答

?
小唯快跑啊

TA貢獻1863條經驗 獲得超2個贊

博士;

  • 遞歸算法應該對昂貴的資源(網絡連接、goroutines、堆棧空間等)有限制。

  • 應支持取消 - 以確保在不再需要結果時可以快速清理昂貴的操作

  • 分支遍歷應支持錯誤報告;這允許錯誤冒泡堆棧并返回部分結果,而不會使整個遞歸遍歷失敗。


對于同步結果 ( 無論是否使用遞歸 - 建議使用通道。此外,對于具有許多 goroutine 的長時間運行的作業,請提供取消方法(上下文。上下文)以幫助清理。

由于遞歸可能導致資源呈指數級消耗,因此設置限制非常重要(請參閱有界并行性)。

以下是我經常用于異步任務的設計模式:

  • 始終支持采取上下文。取消的上下文

  • 任務所需的工作人員數

  • 返回 a 的結果和 a(將只返回一個錯誤或chanchan errornil)

var (

    workers = 10

    ctx     = context.TODO() // use request context here - otherwise context.Background()

    input   = "abc"

)


resultC, errC := recJob(ctx, workers, input) // returns results & `error` channels


// asynchronous results - so read that channel first in the event of partial results ...

for r := range resultC {

    fmt.Println(r)

}


// ... then check for any errors

if err := <-errC; err != nil {

    log.Fatal(err)

}

遞歸:

由于遞歸可以快速水平擴展,因此需要一種一致的方式來填充有限的工人列表,同時還要確保當工人被釋放時,他們能夠快速從其他(過度工作)工人那里接手工作。

與其創建經理層,不如采用合作的同事對等系統:

  • 每個工作線程共享一個輸入通道

  • 在輸入上遞歸之前 () 檢查是否有任何其他工作線程處于空閑狀態subIinputs

    • 如果是這樣,請委派給該工作人員

    • 如果不是,則當前工作線程繼續遞歸該分支

有了這個算法,有限的工人數量很快就會被工作所淹沒。任何提前完成分支的工人 - 將很快從另一個工人那里獲得子分支。最終,所有工作線程都將用完子分支,此時所有工作線程都將空閑(阻止),遞歸任務可以完成。

要實現這一目標,需要進行一些認真的協調。允許工作線程寫入輸入通道有助于通過委派進行這種對等協調?!斑f歸深度”用于跟蹤所有工作線程的所有分支何時耗盡。WaitGroup

(為了包括上下文支持和錯誤鏈接 - 我更新了您的函數以采用a并返回可選):getSubInputsctxerror

func recFunc(ctx context.Context, input string, in chan string, out chan<- string, rwg *sync.WaitGroup) error {


    defer rwg.Done() // decrement recursion count when a depth of recursion has completed


    subInputs, err := getSubInputs(ctx, input)

    if err != nil {

        return err

    }


    for subInput := range subInputs { 

        rwg.Add(1) // about to recurse (or delegate recursion)


        select {

        case in <- subInput:

            // delegated - to another goroutine


        case <-ctx.Done():

            // context canceled...


            // but first we need to undo the earlier `rwg.Add(1)`

            // as this work item was never delegated or handled by this worker

            rwg.Done()

            return ctx.Err()


        default:

            // noone available to delegate - so this worker will need to recurse this item themselves

            err = recFunc(ctx, subInput, in, out, rwg)

            if err != nil {

                return err

            }

        }


        select {

        case <-ctx.Done():

            // always check context when doing anything potentially blocking (in this case writing to `out`)

            // context canceled

            return ctx.Err()


        case out <- subInput:

        }

    }


    return nil

}

連接工件:

recJob創建:

  • 輸入和輸出通道 - 由所有工人共享

  • “遞歸”檢測所有工作線程何時空閑WaitGroup

    • 然后可以安全地關閉“輸出”通道

  • 所有工作線程的錯誤通道

  • 通過將初始輸入寫入輸入通道來啟動遞歸工作負載

func recJob(ctx context.Context, workers int, input string) (resultsC <-chan string, errC <-chan error) {


    // RW channels

    out := make(chan string)

    eC := make(chan error, 1)


    // R-only channels returned to caller

    resultsC, errC = out, eC


    // create workers + waitgroup logic

    go func() {


        var err error // error that will be returned to call via error channel


        defer func() {

            close(out)

            eC <- err

            close(eC)

        }()


        var wg sync.WaitGroup

        wg.Add(1)

        in := make(chan string) // input channel: shared by all workers (to read from and also to write to when they need to delegate)


        workerErrC := createWorkers(ctx, workers, in, out, &wg)


        // get the ball rolling, pass input job to one of the workers

        // Note: must be done *after* workers are created - otherwise deadlock

        in <- input


        errCount := 0


        // wait for all worker error codes to return

        for err2 := range workerErrC {

            if err2 != nil {

                log.Println("worker error:", err2)

                errCount++

            }

        }


        // all workers have completed

        if errCount > 0 {

            err = fmt.Errorf("PARTIAL RESULT: %d of %d workers encountered errors", errCount, workers)

            return

        }


        log.Printf("All %d workers have FINISHED\n", workers)

    }()


    return

}

最后,創建工作線程:


func createWorkers(ctx context.Context, workers int, in chan string, out chan<- string, rwg *sync.WaitGroup) (errC <-chan error) {


    eC := make(chan error) // RW-version

    errC = eC              // RO-version (returned to caller)


    // track the completeness of the workers - so we know when to wrap up

    var wg sync.WaitGroup

    wg.Add(workers)


    for i := 0; i < workers; i++ {

        i := i

        go func() {

            defer wg.Done()


            var err error


            // ensure the current worker's return code gets returned

            // via the common workers' error-channel

            defer func() {

                if err != nil {

                    log.Printf("worker #%3d ERRORED: %s\n", i+1, err)

                } else {

                    log.Printf("worker #%3d FINISHED.\n", i+1)

                }

                eC <- err

            }()


            log.Printf("worker #%3d STARTED successfully\n", i+1)


            // worker scans for input

            for input := range in {


                err = recFunc(ctx, input, in, out, rwg)

                if err != nil {

                    log.Printf("worker #%3d recurseManagers ERROR: %s\n", i+1, err)

                    return

                }

            }


        }()

    }


    go func() {

        rwg.Wait() // wait for all recursion to finish

        close(in)  // safe to close input channel as all workers are blocked (i.e. no new inputs)

        wg.Wait()  // now wait for all workers to return

        close(eC)  // finally, signal to caller we're truly done by closing workers' error-channel

    }()


    return

}


查看完整回答
反對 回復 2022-08-01
?
元芳怎么了

TA貢獻1798條經驗 獲得超7個贊

我可以為此啟動一個單獨的 go 函數,但是我該如何退出這個單獨的 go 函數呢?


您可以在單獨的 go-routine 中通過輸出通道。在這種情況下,當通道關閉時,go-routine將安全退出range


go func() {

   for nextResult := range outputChannel {

     result = append(result, nextResult ...)

   }

}

因此,現在我們需要注意的是,在作為遞歸函數調用的一部分生成的所有go-routine都成功存在之后,通道被關閉。


為此,您可以在所有 go 例程中使用共享等待組,并在主函數中等待該等待組,就像您已經在做的那樣。等待結束后,關閉輸出通道,以便其他 go-routine 也安全退出


func recFunc(input string, outputChannel chan, wg &sync.WaitGroup) {

    defer wg.Done()

    for subInput := range getSubInputs(input) {

        wg.Add(1)

        go recFunc(subInput)

    }

    outputChannel <-getOutput(input)

}


func main() {

    outputChannel := make(chan []string)

    waitGroup := sync.WaitGroup{}


    waitGroup.Add(1)

    go recFunc("some_input", outputChannel, &waitGroup)


    result := []string{}

    go func() {

     for nextResult := range outputChannel {

      result = append(result, nextResult ...)

     }

    }

    waitGroup.Wait()

    close(outputChannel)        

}

PS:如果你想有界并行度來限制指數增長,請查看這個


查看完整回答
反對 回復 2022-08-01
  • 2 回答
  • 0 關注
  • 163 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號