3 回答

TA貢獻1848條經驗 獲得超10個贊
在其他答案之上。
請(非常)小心,關閉通道應該發生在寫入調用站點上,而不是讀取調用站點上。在正在寫入的GoCountColumns通道中r,關閉通道的責任落在GoCountColumns函數上。技術原因是,它是唯一確定該通道將不再被寫入的參與者,因此可以安全關閉。
func GoCountColumns(in chan []string, r chan Result, quit chan int) {
defer close(r) // this line.
for {
select {
case data := <-in:
r <- countColumns(data) // some calculation function
case <-quit:
return // stop goroutine
}
}
}
如果我可以說,函數參數命名約定是將目標作為第一個參數,將源作為第二個參數,然后使用其他參數。GoCountColumns優選地寫成:
func GoCountColumns(dst chan Result, src chan []string, quit chan int) {
defer close(dst)
for {
select {
case data := <-src:
dst <- countColumns(data) // some calculation function
case <-quit:
return // stop goroutine
}
}
}
quit您在流程開始后立即致電。這是不合邏輯的。該quit命令是強制退出序列,一旦檢測到退出信號就應該調用它,以盡可能以最佳狀態(可能全部損壞)強制退出當前處理。換句話說,您應該依賴該signal.Notify包來捕獲退出事件,并通知您的工作人員退出。請參閱https://golang.org/pkg/os/signal/#example_Notify
為了編寫更好的并行代碼,首先列出管理程序生命周期所需的例程,確定需要阻塞的例程以確保程序在退出之前完成。
在您的代碼中,存在read, map。為了確保處理完整,程序主函數必須確保在退出時捕獲信號,map然后再退出。請注意,該read功能并不重要。
然后,您還需要從用戶輸入捕獲退出事件所需的代碼。
總的來說,我們似乎需要阻止兩個事件來管理生命周期。示意性地說,
func main(){
go read()
go map(mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
}
}
這個簡單的代碼很好process or die。事實上,當捕獲到用戶事件時,程序立即退出,而不給其他例程機會執行停止時所需的操作。
為了改善這些行為,您首先需要一種方法來表明程序想要離開其他例程,其次需要一種方法來等待這些例程在離開之前完成其停止序列。
要發出退出事件或取消信號,您可以使用 a context.Context,將其傳遞給工作人員,讓他們聽。
再次,示意性地,
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
}
}
(稍后將詳細閱讀和繪制地圖)
要等待完成,很多事情都是可能的,只要它們是線程安全的。通常,sync.WaitGroup使用 a?;蛘?,在像您這樣的情況下,只有一個例程需要等待,我們可以重新使用當前mapDone通道。
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
<-mapDone
}
}
這很簡單也很直接。但這并不完全正確。最后一個mapDone chan可能會永遠阻塞并使程序無法停止。因此,您可以實現第二個信號處理程序或超時。
示意性地,超時解決方案是
func main(){
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
go map(ctx,mapDone)
go signal()
select {
case <-mapDone:
case <-sig:
cancel()
select {
case <-mapDone:
case <-time.After(time.Second):
}
}
}
您還可以在最后一次選擇中累積信號處理和超時。
最后,有幾件事要講read和map上下文聆聽。
首先map,實現需要定期讀取context.Done通道來檢測cancellation。
這是簡單的部分,只需要更新 select 語句。
func GoCountColumns(ctx context.Context, dst chan Result, src chan []string) {
defer close(dst)
for {
select {
case <-ctx.Done():
<-time.After(time.Minute) // do something more useful.
return // quit. Notice the defer will be called.
case data := <-src:
dst <- countColumns(data) // some calculation function
}
}
}
現在這read部分有點棘手,因為它是一個 IO,它不提供select強大的編程接口,并且監聽上下文通道取消可能看起來很矛盾。這是。由于 IO 是阻塞的,因此無法偵聽上下文。并且在從上下文通道讀取時,無法讀取 IO。在您的情況下,解決方案需要了解您的讀取循環與您的程序生命周期無關(還記得我們只監聽mapDone嗎?),并且我們可以忽略上下文。
在其他情況下,例如,如果您想在讀取最后一個字節時重新啟動(因此在每次讀取時,我們都會增加 n,計算字節數,并且我們希望在停止時保存該值)。然后,需要啟動一個新的例程,因此,多個例程需要等待完成。在這種情況下,async.WaitGroup會更合適。
示意性地說,
func main(){
var wg sync.WaitGroup
processDone:=make(chan struct{})
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
wg.Add(1)
go saveN(ctx,&wg)
wg.Add(1)
go map(ctx,&wg)
go signal()
go func(){
wg.Wait()
close(processDone)
}()
select {
case <-processDone:
case <-sig:
cancel()
select {
case <-processDone:
case <-time.After(time.Second):
}
}
}
在最后的代碼中,正在傳遞等待組。例程負責調用wg.Done(),當所有例程完成后,processDone通道關閉,以發出選擇信號。
func GoCountColumns(ctx context.Context, dst chan Result, src chan []string, wg *sync.WaitGroup) {
defer wg.Done()
defer close(dst)
for {
select {
case <-ctx.Done():
<-time.After(time.Minute) // do something more useful.
return // quit. Notice the defer will be called.
case data := <-src:
dst <- countColumns(data) // some calculation function
}
}
}
尚未確定哪種模式是首選,但您也可能會看到waitgroup僅在調用站點進行管理。
func main(){
var wg sync.WaitGroup
processDone:=make(chan struct{})
ctx,cancel := context.WithCancel(context.WithBackground())
go read(ctx)
wg.Add(1)
go func(){
defer wg.Done()
saveN(ctx)
}()
wg.Add(1)
go func(){
defer wg.Done()
map(ctx)
}()
go signal()
go func(){
wg.Wait()
close(processDone)
}()
select {
case <-processDone:
case <-sig:
cancel()
select {
case <-processDone:
case <-time.After(time.Second):
}
}
}
除了所有這些問題和 OP 問題之外,您必須始終預先評估并行處理對于給定任務的相關性。沒有獨特的秘訣,練習和衡量你的代碼性能。參見 pprof.

TA貢獻1850條經驗 獲得超11個贊
這段代碼中發生的事情太多了。您應該將代碼重組為服務于特定目的的短函數,以便其他人可以輕松地幫助您(也可以幫助您自己)。
有多種方法可以讓一個 go-routine 等待其他工作完成。最常見的方法是使用等待組(我提供的示例)或通道。
func processSomething(...) {
? ? ...
}
func main() {
? ? workers := &sync.WaitGroup{}
? ? for i := 0; i < numWorkers; i++ {
? ? ? ? workers.Add(1) // you want to call this from the calling go-routine and before spawning the worker go-routine
? ? ? ? go func() {
? ? ? ? ? ? defer workers.Done() // you want to call this from the worker go-routine when the work is done (NOTE the defer, which ensures it is called no matter what)
? ? ? ? ? ? processSomething(....) // your async processing
? ? ? ? }()
? ? }
? ? // this will block until all workers have finished their work
? ? workers.Wait()
}

TA貢獻1775條經驗 獲得超8個贊
您可以使用通道來阻塞,main直到 Goroutine 完成。
package main
import (
"log"
"time"
)
func main() {
c := make(chan struct{})
go func() {
time.Sleep(3 * time.Second)
log.Println("bye")
close(c)
}()
// This blocks until the channel is closed by the routine
<-c
}
無需向通道寫入任何內容。讀取會被阻塞,直到讀取數據或者我們在這里使用的通道關閉為止。
- 3 回答
- 0 關注
- 191 瀏覽
添加回答
舉報