3 回答

TA貢獻1951條經驗 獲得超3個贊
這是我的評論的后續內容,在您添加示例解決方案后添加。為了比我在注釋中更清晰,您的示例代碼實際上并沒有那么糟糕。以下是您的原始示例:
// Busywait solution
for {
select {
case <-time.After(to):
if cbOneDone && cbTwoDone {
fmt.Println("Both CB executed (we could poll more often)")
return nil
}
fmt.Println("Timeout!")
return fmt.Errorf("Timeout")
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
這不是一個“繁忙的等待”,但它確實有幾個錯誤(包括你需要一個只為已完成的通道發送語義的事實,或者可能更容易,至少同樣好,在完成時只關閉它們一次,也許使用)。我們想做的是:sync.Once
啟動計時器 作為超時。
to
使用計時器的通道和兩個“完成”通道輸入選擇循環。
我們希望在發生以下第一個事件時退出 select 循環:
計時器觸發,或
雙“完成”通道已發出信號。
如果我們要轉到兩個已完成的通道,我們也希望清除變量(設置為 ),以便選擇不會旋轉 - 這將變成真正的忙碌等待 - 但目前讓我們假設我們在回調時只發送一次,否則只會泄漏通道, 以便我們可以按照編寫的方式使用您的代碼,因為這些選擇只會返回一次。以下是更新后的代碼:close
Ch
nil
t := timer.NewTimer(to)
for !cbOneDone || !cbTwoDone {
select {
case <-t.C:
fmt.Println("Timeout!")
return fmt.Errorf("timeout")
}
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
// insert t.Stop() and receive here to drain t.C if desired
fmt.Println("Both CB executed")
return nil
請注意,我們最多將經歷兩次循環:
如果我們從兩個 Done 通道接收到每個通道,則循環停止而不會超時。沒有旋轉/忙碌等待:我們從未收到過任何東西。我們返回零(無錯誤)。
t.C
如果我們從一個 Done 通道接收,循環將恢復,但會阻塞等待計時器或另一個 Done 通道。
如果我們從 接收到 ,則表示我們尚未收到兩個回調。我們可能有一個,但有一個暫停,我們選擇放棄,這是我們的目標。我們返回一個錯誤,而不通過循環返回。
t.C
一個真正的版本需要更多的工作來正確清理并避免泄漏“完成”通道(以及計時器通道及其goroutine;參見評論),但這是一般的想法。您已經將回調轉換為通道操作,并且已經具有其通道的計時器。

TA貢獻1934條經驗 獲得超2個贊
下面的代碼有兩個變體,
第一個是常規模式,沒有什么花哨的,它做了工作,做得很好。您將回調啟動到例程中,使它們推送到接收器,收聽該接收器以獲取結果或超時。注意接收器通道的初始容量,為了防止泄漏例程,它必須與回調次數匹配。
第二個工廠將同步機制分解成小函數進行組裝,提供兩種等待方法,waitAll 和 waitOne。寫起來不錯,但效率肯定更低,分配更多,渠道更多來回,推理更復雜,更微妙。
package main
import (
"fmt"
"log"
"sync"
"time"
)
func main() {
ExampleOne()
ExampleTwo()
ExampleThree()
fmt.Println("Hello, playground")
}
func ExampleOne() {
log.Println("start reg")
errs := make(chan error, 2)
go func() {
fn := callbackWithOpts("reg: so slow", 2*time.Second, nil)
errs <- fn()
}()
go func() {
fn := callbackWithOpts("reg: too fast", time.Millisecond, fmt.Errorf("broke!"))
errs <- fn()
}()
select {
case err := <-errs: // capture only one result,
// the fastest to finish.
if err != nil {
log.Println(err)
}
case <-time.After(time.Second): // or wait that many amount of time,
// in case they are all so slow.
}
log.Println("done reg")
}
func ExampleTwo() {
log.Println("start wait")
errs := waitAll(
withTimeout(time.Second,
callbackWithOpts("waitAll: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitAll: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done wait")
}
func ExampleThree() {
log.Println("start waitOne")
errs := waitOne(
withTimeout(time.Second,
callbackWithOpts("waitOne: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitOne: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done waitOne")
}
// a configurable callback for playing
func callbackWithOpts(msg string, tout time.Duration, err error) func() error {
return func() error {
<-time.After(tout)
fmt.Println(msg)
return err
}
}
// withTimeout return a function that returns first error or times out and return nil
func withTimeout(tout time.Duration, h func() error) func() error {
return func() error {
d := make(chan error, 1)
go func() {
d <- h()
}()
select {
case err := <-d:
return err
case <-time.After(tout):
}
return nil
}
}
// wait launches all func() and return their errors into the returned error channel; (merge)
// It is the caller responsability to drain the output error channel.
func waitAll(h ...func() error) chan error {
d := make(chan error, len(h))
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
wg.Wait()
close(d)
}()
return d
}
// wait launches all func() and return the first error into the returned error channel
// It is the caller responsability to drain the output error channel.
func waitOne(h ...func() error) chan error {
d := make(chan error, len(h))
one := make(chan error, 1)
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
for err := range d {
one <- err
close(one)
break
}
}()
go func() {
wg.Wait()
close(d)
}()
return one
}
func trim(err chan error) chan error {
out := make(chan error)
go func() {
for e := range err {
out <- e
}
close(out)
}()
return out
}

TA貢獻1864條經驗 獲得超2個贊
func wait(ctx context.Context, wg *sync.WaitGroup) error {
done := make(chan struct{}, 1)
go func() {
wg.Wait()
done <- struct{}{}
}()
select {
case <-done:
// Counter is 0, so all callbacks completed.
return nil
case <-ctx.Done():
// Context cancelled.
return ctx.Err()
}
}
或者,您可以傳遞 a 和 塊而不是 on ,但我認為使用上下文更習慣用語。time.Duration<-time.After(d)<-ctx.Done()
- 3 回答
- 0 關注
- 97 瀏覽
添加回答
舉報