4 回答
TA貢獻1796條經驗 獲得超10個贊
是的,WaitGroup是正確的答案。根據doc?,您可以隨時使用WaitGroup.Add計數器大于零。
請注意,當計數器為零時發生的具有正增量的調用必須發生在等待之前。具有負增量的調用或在計數器大于零時開始的具有正增量的調用可能隨時發生。通常這意味著對 Add 的調用應該在創建 goroutine 或其他要等待的事件的語句之前執行。如果重復使用 WaitGroup 來等待多個獨立的事件集,則必須在所有先前的 Wait 調用返回后發生新的 Add 調用。請參閱 WaitGroup 示例。
Close但是一個技巧是,在調用之前,您應該始終保持計數器大于零。這通常意味著您應該調用wg.Addin?NewFoo(或類似的東西)并wg.Donein?Close.?并且為了防止多次調用Done破壞等待組,你應該包裝Close成sync.Once.?您可能還想防止Bar()調用 new。
TA貢獻1811條經驗 獲得超6個贊
我認為無限期地等待所有 go 例程完成不是正確的方法。如果其中一個 go routines 被阻塞或說它由于某種原因掛起并且從未成功終止,應該發生什么情況 kill 進程或等待 go routines 完成?
相反,無論所有例程是否已完成,您都應該等待一段時間并終止應用程序。
上下文包可用于向所有 go 例程發送信號以處理 kill 信號。
appCtx, cancel := context.WithCancel(context.Background())
這里 appCtx 必須傳遞給所有的 go 例程。
在退出信號調用cancel()。
作為 go 例程運行的函數可以處理如何處理取消上下文。
TA貢獻1853條經驗 獲得超18個贊
WaitGroup是一種方式,但是,Go 團隊errgroup完全針對您的用例引入了。leaf bebop 的回答中最不方便的部分是忽視錯誤處理。錯誤處理是存在的原因errgroup。慣用的 go 代碼不應該吞下錯誤。
但是,保留結構的簽名Foo(裝飾性的除外workerNumber)——并且沒有錯誤處理——我的建議如下所示:
package main
import (
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
type Foo struct {
errg errgroup.Group
}
func NewFoo() *Foo {
foo := &Foo{
errg: errgroup.Group{},
}
return foo
}
func (a *Foo) Bar(workerNumber int) {
a.errg.Go(func() error {
select {
// simulates the long running clals
case <-time.After(time.Second * time.Duration(rand.Intn(10))):
fmt.Println(fmt.Sprintf("worker %d completed its work", workerNumber))
return nil
}
})
}
func (a *Foo) Close() {
a.errg.Wait()
}
func main() {
foo := NewFoo()
for i := 0; i < 10; i++ {
foo.Bar(i)
}
<-time.After(time.Second * 5)
fmt.Println("Waiting for workers to complete...")
foo.Close()
fmt.Println("Done.")
}
這里的好處是,如果你在你的代碼中引入錯誤處理(你應該),你只需要稍微修改這段代碼:簡而言之,將返回errg.Wait()第一個 redis 錯誤,并且Close()可以通過堆棧向上傳播它(到 main,在這種情況下)。
也可以使用該context.Context包,如果調用失敗,您還可以立即取消任何正在運行的 redis 調用。文檔中有這方面的示例errgroup。
TA貢獻2039條經驗 獲得超8個贊
我經常使用的模式是:https ://play.golang.org/p/ibMz36TS62z
package main
import (
"fmt"
"sync"
"time"
)
type response struct {
message string
}
func task(i int, done chan response) {
time.Sleep(1 * time.Second)
done <- response{fmt.Sprintf("%d done", i)}
}
func main() {
responses := GetResponses(10)
fmt.Println("all done", len(responses))
}
func GetResponses(n int) []response {
donequeue := make(chan response)
wg := sync.WaitGroup{}
for i := 0; i < n; i++ {
wg.Add(1)
go func(value int) {
defer wg.Done()
task(value, donequeue)
}(i)
}
go func() {
wg.Wait()
close(donequeue)
}()
responses := []response{}
for result := range donequeue {
responses = append(responses, result)
}
return responses
}
這也使得節流變得容易:https ://play.golang.org/p/a4MKwJKj634
package main
import (
"fmt"
"sync"
"time"
)
type response struct {
message string
}
func task(i int, done chan response) {
time.Sleep(1 * time.Second)
done <- response{fmt.Sprintf("%d done", i)}
}
func main() {
responses := GetResponses(10, 2)
fmt.Println("all done", len(responses))
}
func GetResponses(n, concurrent int) []response {
throttle := make(chan int, concurrent)
for i := 0; i < concurrent; i++ {
throttle <- i
}
donequeue := make(chan response)
wg := sync.WaitGroup{}
for i := 0; i < n; i++ {
wg.Add(1)
<-throttle
go func(value int) {
defer wg.Done()
throttle <- 1
task(value, donequeue)
}(i)
}
go func() {
wg.Wait()
close(donequeue)
}()
responses := []response{}
for result := range donequeue {
responses = append(responses, result)
}
return responses
}
- 4 回答
- 0 關注
- 222 瀏覽
添加回答
舉報
