1 回答

TA貢獻2065條經驗 獲得超14個贊
使用 async.WaitGroup來跟蹤正在運行的 goroutines 的數量。每個 goroutine 在不再從通道獲取數據后退出。一旦WaitGroup完成,清理就可以完成了。
是這樣的:
import (
"sync"
"time"
)
type Data interface{} // just an example
type Consumer interface {
Consume(Data) Data
CleanUp()
Count() int
Timeout() time.Duration
}
func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
wg := sync.WaitGroup{}
for i := 0; i < consumer.Count(); i++ {
wg.Add(1)
go func() {
consumeLoop:
for {
select {
case v, ok := <-inCh: // 'ok' says if the channel is still open
if !ok {
break consumeLoop
}
outCh <- consumer.Consume(v)
case <-time.After(consumer.Timeout()):
break consumeLoop
}
}
wg.Done()
}()
}
wg.Wait()
consumer.CleanUp()
close(outCh)
}
在管道的每個階段,您都可以使用與上述類似的過程來啟動消費者。
- 1 回答
- 0 關注
- 116 瀏覽
添加回答
舉報