2 回答

TA貢獻1818條經驗 獲得超8個贊
讓我們看看你的程序在做什么。您首先初始化了三個緩沖通道 var1、var2、var3
var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)
現在你初始化了一個 WaitGroup (wg)
var wg sync.WaitGroup
現在您定義了變量計數并且該變量是空字符串
count = ""
下一部分是一個從 0 到 15 并生成 15 個 worker1 go 例程的循環
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &wg)
}
每次啟動一個 worker1 時,執行例行程序并將通道和指針傳遞給 worker1 中的 waitgroup (wg)。
但是worker1會做什么呢?
func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 <- ch
}
}
worker1 將等待通道 var1 從該通道獲取數據并將其傳遞給通道 var2。
這可以。你絕對不需要這個 time.Sleep(time.Second)。
我們下一步吧。
您現在創建一個新循環,它將生成另外 15 個 go 例程 (worker2)。
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &wg)
}
worker2 將從通道 var2 中獲取所有內容并將其傳遞給通道 var3
func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 <- ch
}
}
現在你為 worker3 創建了另外 15 個 go 例程。
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &wg)
}
worker3 通過將通道 var3 附加到計數字符串來處理該數據的所有內容
最后一段代碼是將數據播種到通道。該循環從 0 到 100000 并且對于每個數字將它們轉換為字符串并將其傳遞給通道 var1
下一個程序將等待所有程序完成并打印結果。
好的,這段代碼有一些問題。
在每個 goroutine 之前你絕對不需要這個 time.Sleep(time.Second),你也不需要在 wg.Wait() 之前的 time.Sleep。
這種類型的工作負載不需要緩沖通道。這是一個簡單的管道,您可以使用無緩沖的通道,并且該通道將用于任務之間的同步。
當您更改代碼以使用無緩沖通道并刪除這些時間時。睡眠仍然有問題。問題是 go lang 運行時顯示的錯誤:
fatal error: all goroutines are asleep - deadlock!
并終止代碼。
但是為什么會發生這種情況,我們有sync.WaitGroup,一切看起來都很好。讓我們看一個具有相同錯誤的更簡單的程序。
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i++ {
var1 <- strconv.Itoa(i)
}
wg.Wait()
}
此代碼也會產生與您的代碼相同的錯誤。這是因為這些通道永遠不會關閉,go 例程(工作人員)將永遠等待通道中的新數據。運行時檢測并終止該進程。
為了防止這種類型的錯誤,我們需要添加一些機制來告訴 gorutine 我們已經完成并且 goroutine 可以停止在該通道上偵聽并正確完成。
發送該信號的最簡單方法是關閉該 goroutine 讀取的通道。這是解決問題的代碼。
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i++ {
var1 <- strconv.Itoa(i)
}
close(var1)
wg.Wait()
}
并且此代碼不會產生錯誤。此代碼將被正確終止。
但是有一個竅門。你怎么能在你的代碼中做到這一點?從 var1 通道讀取的 goroutine 有 15 個,從 var2 通道讀取的 goroutine 有 15 個,從 var3 通道讀取的 goroutine 有 15 個。
很難知道什么時候可以關閉哪個頻道。我們知道通道 var1 首先處理數據,因此我們可以在生產者完成同步通道中的插入數據時關閉它們。原因是在讀取之前的數據之前,我們不能將新數據插入通道。因此,當生產者插入所有數據時,我們知道通道 var1 上的所有數據都已處理,因此關閉通道是安全的。但是頻道var2和var3呢?
有 15 個不同的 goroutine 等待通道 var2 和 15 個等待 var3,這意味著我們需要找到一種方法來在 var2 上的所有處理完成時關閉 var2(在所有 goroutines worker1 中),對于 var3 也是如此。這可以通過創建兩個額外的 goroutine 來完成
wg1 和 wg2 并使用該 goroutine 為 worker1 和 worker2 生成 goroutine,這些 goroutine 將作為一個協調器工作,在這些函數內部,我們為 worker1 和 worker2 創建新的 sync.Group,這些函數將知道所有這些子 goroutine 的時間完成的。因此,對于 wg1,當所有這些 worker1 goroutine 都完成后,我們可以安全地關閉 var2 通道。wg2 和 var3 通道相同。
這些是 wg1 和 wg2 函數
// wg1
wg.Add(1)
go func() {
log.Printf("Starting WG1 master go routine")
var wg1 sync.WaitGroup
defer func() {
close(var2)
wg.Done()
}()
for i := 0; i < 15; i++ {
wg1.Add(1)
go worker1(var1, var2, &wg1)
}
wg1.Wait()
}()
// wg2
wg.Add(1)
go func() {
log.Printf("Starting WG2 routine for second stage")
defer func() {
close(var3)
wg.Done()
}()
var wg2 sync.WaitGroup
for i := 0; i < 15; i++ {
wg2.Add(1)
go worker2(var2, var3, &wg2)
}
wg2.Wait()
}()

TA貢獻1828條經驗 獲得超6個贊
是的,這很復雜,但是有一些經驗法則可以讓事情變得更加簡單。
更喜歡對傳遞給 go-routines 的通道使用形式參數,而不是訪問全局范圍內的通道。您可以通過這種方式獲得更多的編譯器檢查,以及更好的模塊化。
避免在特定的 go-routine(包括“主”)中在同一通道上讀取和寫入。否則,死鎖的風險要大得多。
- 2 回答
- 0 關注
- 125 瀏覽
添加回答
舉報