1 回答

TA貢獻1786條經驗 獲得超13個贊
這是您的代碼的基于通道的版本,在功能上等同于上面示例的意圖。關鍵點是我們沒有使用任何原子值來改變代碼的邏輯,因為這不提供 goroutine 之間的同步。goroutine 之間的所有交互都使用通道sync.WaitGroup、 或同步context.Context??赡苡懈玫姆椒▉斫鉀Q手頭的問題,但這表明沒有必要的原子來協調隊列和工作人員。
這里唯一在 goroutines 之間仍然不協調的值是len(jobs)在日志輸出中的使用。使用它是否有意義取決于你,因為它的值在并發世界中是沒有意義的,但它是安全的,因為它是為并發使用而同步的,并且沒有基于該值的邏輯。
buf := 5
workers := 3
jobs := make(chan int, buf)
// results buffer must always be larger than workers + buf to prevent deadlock
results := make(chan int, buf*2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start workers
var wg sync.WaitGroup
for n := 0; n < workers; n++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
for jobID := range jobs {
fmt.Printf("worker %v processing %v - %v jobs left\n", n, jobID, len(jobs))
time.Sleep(time.Duration(rand.Intn(5)) * pollInterval)
results <- jobID
}
fmt.Printf("worker %v exited", n)
}(n)
}
var done sync.WaitGroup
done.Add(1)
go func() {
defer done.Done()
ticker := time.NewTicker(pollInterval)
r := make([]string, 0)
flushResults := func() {
fmt.Printf("===> results: %v\n", strings.Join(r, ","))
r = r[:0]
}
for {
select {
case <-ticker.C:
flushResults()
// send max buf jobs, or fill the queue
for i := 0; i < buf; i++ {
jobID++
select {
case jobs <- jobID:
continue
}
break
}
fmt.Printf("===> send %v jobs\n", i)
case jobID := <-results:
r = append(r, fmt.Sprintf("%v", jobID))
case <-ctx.Done():
// Close jobs channel to stop workers
close(jobs)
// Wait for workers to exit
wg.Wait()
// we can close results for easy iteration because we know
// there are no more workers.
close(results)
// Flush remaining results
for jobID := range results {
r = append(r, fmt.Sprintf("%v", jobID))
}
flushResults()
return
}
}
}()
- 1 回答
- 0 關注
- 129 瀏覽
添加回答
舉報