亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何協調多個 goroutine 的關閉

如何協調多個 goroutine 的關閉

Go
米脂 2023-06-05 17:22:05
說我有一個功能type Foo struct {}func (a *Foo) Bar() {    // some expensive work - does some calls to redis}它在我的應用程序的某個時刻在 goroutine 中執行。許多這些可能在任何給定點執行。在應用程序終止之前,我想確保所有剩余的 goroutines 都已完成它們的工作。我可以做這樣的事情嗎:type Foo struct {    wg sync.WaitGroup}func (a *Foo) Close() {    a.wg.Wait()}func (a *Foo) Bar() {    a.wg.Add(1)    defer a.wg.Done()    // some expensive work - does some calls to redis}假設 Bar 在 goroutine 中執行,其中許多可能在給定時間運行,并且一旦調用 Close 并且在 sigterm 或 sigint 上調用 Close 時不應調用 Bar。這有意義嗎?通常我會看到 Bar 函數如下所示:func (a *Foo) Bar() {    a.wg.Add(1)    go func() {        defer a.wg.Done()        // some expensive work - does some calls to redis    }()}
查看完整描述

4 回答

?
白衣染霜花

TA貢獻1796條經驗 獲得超10個贊

是的,WaitGroup是正確的答案。根據doc?,您可以隨時使用WaitGroup.Add計數器大于零。

請注意,當計數器為零時發生的具有正增量的調用必須發生在等待之前。具有負增量的調用或在計數器大于零時開始的具有正增量的調用可能隨時發生。通常這意味著對 Add 的調用應該在創建 goroutine 或其他要等待的事件的語句之前執行。如果重復使用 WaitGroup 來等待多個獨立的事件集,則必須在所有先前的 Wait 調用返回后發生新的 Add 調用。請參閱 WaitGroup 示例。

Close但是一個技巧是,在調用之前,您應該始終保持計數器大于零。這通常意味著您應該調用wg.Addin?NewFoo(或類似的東西)并wg.Donein?Close.?并且為了防止多次調用Done破壞等待組,你應該包裝Closesync.Once.?您可能還想防止Bar()調用 new。


查看完整回答
反對 回復 2023-06-05
?
楊魅力

TA貢獻1811條經驗 獲得超6個贊

我認為無限期地等待所有 go 例程完成不是正確的方法。如果其中一個 go routines 被阻塞或說它由于某種原因掛起并且從未成功終止,應該發生什么情況 kill 進程或等待 go routines 完成?

相反,無論所有例程是否已完成,您都應該等待一段時間并終止應用程序。


上下文包可用于向所有 go 例程發送信號以處理 kill 信號。

appCtx, cancel := context.WithCancel(context.Background())

這里 appCtx 必須傳遞給所有的 go 例程。

在退出信號調用cancel()

作為 go 例程運行的函數可以處理如何處理取消上下文。

查看完整回答
反對 回復 2023-06-05
?
慕容森

TA貢獻1853條經驗 獲得超18個贊

WaitGroup是一種方式,但是,Go 團隊errgroup完全針對您的用例引入了。leaf bebop 的回答中最不方便的部分是忽視錯誤處理。錯誤處理是存在的原因errgroup。慣用的 go 代碼不應該吞下錯誤。


但是,保留結構的簽名Foo(裝飾性的除外workerNumber)——并且沒有錯誤處理——我的建議如下所示:


package main


import (

    "fmt"

    "math/rand"

    "time"


    "golang.org/x/sync/errgroup"

)


type Foo struct {

    errg errgroup.Group

}


func NewFoo() *Foo {

    foo := &Foo{

        errg: errgroup.Group{},

    }

    return foo

}


func (a *Foo) Bar(workerNumber int) {

    a.errg.Go(func() error {

        select {

        // simulates the long running clals

        case <-time.After(time.Second * time.Duration(rand.Intn(10))):

            fmt.Println(fmt.Sprintf("worker %d completed its work", workerNumber))

            return nil

        }

    })

}


func (a *Foo) Close() {

    a.errg.Wait()

}


func main() {

    foo := NewFoo()


    for i := 0; i < 10; i++ {

        foo.Bar(i)

    }


    <-time.After(time.Second * 5)

    fmt.Println("Waiting for workers to complete...")

    foo.Close()

    fmt.Println("Done.")

}

這里的好處是,如果你在你的代碼中引入錯誤處理(你應該),你只需要稍微修改這段代碼:簡而言之,將返回errg.Wait()第一個 redis 錯誤,并且Close()可以通過堆棧向上傳播它(到 main,在這種情況下)。


也可以使用該context.Context包,如果調用失敗,您還可以立即取消任何正在運行的 redis 調用。文檔中有這方面的示例errgroup。


查看完整回答
反對 回復 2023-06-05
?
largeQ

TA貢獻2039條經驗 獲得超8個贊

我經常使用的模式是:https ://play.golang.org/p/ibMz36TS62z

package main


import (

    "fmt"

    "sync"

    "time"

)


type response struct {

    message string

}


func task(i int, done chan response) {

    time.Sleep(1 * time.Second)

    done <- response{fmt.Sprintf("%d done", i)}

}


func main() {


    responses := GetResponses(10)


    fmt.Println("all done", len(responses))

}


func GetResponses(n int) []response {

    donequeue := make(chan response)

    wg := sync.WaitGroup{}

    for i := 0; i < n; i++ {

        wg.Add(1)

        go func(value int) {

            defer wg.Done()

            task(value, donequeue)

        }(i)

    }

    go func() {

        wg.Wait()

        close(donequeue)

    }()

    responses := []response{}

    for result := range donequeue {

        responses = append(responses, result)

    }


    return responses

}

這也使得節流變得容易:https ://play.golang.org/p/a4MKwJKj634

package main


import (

    "fmt"

    "sync"

    "time"

)


type response struct {

    message string

}


func task(i int, done chan response) {

    time.Sleep(1 * time.Second)

    done <- response{fmt.Sprintf("%d done", i)}

}


func main() {


    responses := GetResponses(10, 2)


    fmt.Println("all done", len(responses))

}


func GetResponses(n, concurrent int) []response {


    throttle := make(chan int, concurrent)

    for i := 0; i < concurrent; i++ {

        throttle <- i

    }

    donequeue := make(chan response)

    wg := sync.WaitGroup{}

    for i := 0; i < n; i++ {

        wg.Add(1)

        <-throttle

        go func(value int) {

            defer wg.Done()

            throttle <- 1

            task(value, donequeue)

        }(i)

    }

    go func() {

        wg.Wait()

        close(donequeue)

    }()

    responses := []response{}

    for result := range donequeue {

        responses = append(responses, result)

    }


    return responses

}


查看完整回答
反對 回復 2023-06-05
  • 4 回答
  • 0 關注
  • 222 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號