亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何從主線程退出

如何從主線程退出

Go
繁花不似錦 2023-08-07 10:55:32
func GoCountColumns(in chan []string, r chan Result, quit chan int) {    for {        select {        case data := <-in:            r <- countColumns(data) // some calculation function        case <-quit:            return // stop goroutine        }    }}func main() {    fmt.Println("Welcome to the csv Calculator")    file_path := os.Args[1]    fd, _ := os.Open(file_path)    reader := csv.NewReader(bufio.NewReader(fd))    var totalColumnsCount int64 = 0    var totallettersCount int64 = 0    linesCount := 0    numWorkers := 10000    rc := make(chan Result, numWorkers)    in := make(chan []string, numWorkers)    quit := make(chan int)    t1 := time.Now()    for i := 0; i < numWorkers; i++ {        go GoCountColumns(in, rc, quit)    }    //start worksers    go func() {        for {            record, err := reader.Read()            if err == io.EOF {                break            }            if err != nil {                log.Fatal(err)            }            if linesCount%1000000 == 0 {                fmt.Println("Adding to the channel")            }            in <- record            //data := countColumns(record)            linesCount++            //totalColumnsCount = totalColumnsCount + data.ColumnCount            //totallettersCount = totallettersCount + data.LettersCount        }        close(in)    }()    for i := 0; i < numWorkers; i++ {        quit <- 1 // quit goroutines from main    }    close(rc)    for i := 0; i < linesCount; i++ {        data := <-rc        totalColumnsCount = totalColumnsCount + data.ColumnCount        totallettersCount = totallettersCount + data.LettersCount    }    fmt.Printf("I counted %d lines\n", linesCount)    fmt.Printf("I counted %d columns\n", totalColumnsCount)    fmt.Printf("I counted %d letters\n", totallettersCount)    elapsed := time.Now().Sub(t1)    fmt.Printf("It took %f seconds\n", elapsed.Seconds())}My Hello World 是一個讀取 csv 文件并將其傳遞到通道的程序。然后 goroutine 應該從這個通道消費。我的問題是我不知道如何從主線程檢測所有數據都已處理并且我可以退出程序。
查看完整描述

3 回答

?
慕桂英546537

TA貢獻1848條經驗 獲得超10個贊

在其他答案之上。


請(非常)小心,關閉通道應該發生在寫入調用站點上,而不是讀取調用站點上。在正在寫入的GoCountColumns通道中r,關閉通道的責任落在GoCountColumns函數上。技術原因是,它是唯一確定該通道將不再被寫入的參與者,因此可以安全關閉。

    func GoCountColumns(in chan []string, r chan Result, quit chan int) {

        defer close(r)     // this line.

        for {

            select {

            case data := <-in:

                r <- countColumns(data) // some calculation function

            case <-quit:

                return // stop goroutine

            }

        }

    }

如果我可以說,函數參數命名約定是將目標作為第一個參數,將源作為第二個參數,然后使用其他參數。GoCountColumns優選地寫成:

    func GoCountColumns(dst chan Result, src chan []string, quit chan int) {

        defer close(dst)

        for {

            select {

            case data := <-src:

                dst <- countColumns(data) // some calculation function

            case <-quit:

                return // stop goroutine

            }

        }

    }

quit您在流程開始后立即致電。這是不合邏輯的。該quit命令是強制退出序列,一旦檢測到退出信號就應該調用它,以盡可能以最佳狀態(可能全部損壞)強制退出當前處理。換句話說,您應該依賴該signal.Notify包來捕獲退出事件,并通知您的工作人員退出。請參閱https://golang.org/pkg/os/signal/#example_Notify

為了編寫更好的并行代碼,首先列出管理程序生命周期所需的例程,確定需要阻塞的例程以確保程序在退出之前完成。


在您的代碼中,存在read, map。為了確保處理完整,程序主函數必須確保在退出時捕獲信號,map然后再退出。請注意,該read功能并不重要。


然后,您還需要從用戶輸入捕獲退出事件所需的代碼。


總的來說,我們似乎需要阻止兩個事件來管理生命周期。示意性地說,


func main(){

    go read()

    go map(mapDone)

    go signal()

    select {

        case <-mapDone:

        case <-sig:

    }

}

這個簡單的代碼很好process or die。事實上,當捕獲到用戶事件時,程序立即退出,而不給其他例程機會執行停止時所需的操作。


為了改善這些行為,您首先需要一種方法來表明程序想要離開其他例程,其次需要一種方法來等待這些例程在離開之前完成其停止序列。


要發出退出事件或取消信號,您可以使用 a context.Context,將其傳遞給工作人員,讓他們聽。


再次,示意性地,


func main(){

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    go map(ctx,mapDone)

    go signal()

    select {

        case <-mapDone:

        case <-sig:

            cancel()

    }

}

(稍后將詳細閱讀和繪制地圖)


要等待完成,很多事情都是可能的,只要它們是線程安全的。通常,sync.WaitGroup使用 a?;蛘?,在像您這樣的情況下,只有一個例程需要等待,我們可以重新使用當前mapDone通道。


func main(){

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    go map(ctx,mapDone)

    go signal()

    select {

        case <-mapDone:

        case <-sig:

            cancel()

            <-mapDone

    }

}

這很簡單也很直接。但這并不完全正確。最后一個mapDone chan可能會永遠阻塞并使程序無法停止。因此,您可以實現第二個信號處理程序或超時。


示意性地,超時解決方案是


func main(){

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    go map(ctx,mapDone)

    go signal()

    select {

        case <-mapDone:

        case <-sig:

            cancel()

            select {

                case <-mapDone:

                case <-time.After(time.Second):

            }

    }

}

您還可以在最后一次選擇中累積信號處理和超時。


最后,有幾件事要講read和map上下文聆聽。


首先map,實現需要定期讀取context.Done通道來檢測cancellation。


這是簡單的部分,只需要更新 select 語句。


    func GoCountColumns(ctx context.Context, dst chan Result, src chan []string) {

        defer close(dst)

        for {

            select {

            case <-ctx.Done():

                <-time.After(time.Minute) // do something more useful.

                return // quit. Notice the defer will be called.

            case data := <-src:

                dst <- countColumns(data) // some calculation function

            }

        }

    }

現在這read部分有點棘手,因為它是一個 IO,它不提供select強大的編程接口,并且監聽上下文通道取消可能看起來很矛盾。這是。由于 IO 是阻塞的,因此無法偵聽上下文。并且在從上下文通道讀取時,無法讀取 IO。在您的情況下,解決方案需要了解您的讀取循環與您的程序生命周期無關(還記得我們只監聽mapDone嗎?),并且我們可以忽略上下文。


在其他情況下,例如,如果您想在讀取最后一個字節時重新啟動(因此在每次讀取時,我們都會增加 n,計算字節數,并且我們希望在停止時保存該值)。然后,需要啟動一個新的例程,因此,多個例程需要等待完成。在這種情況下,async.WaitGroup會更合適。


示意性地說,


func main(){

    var wg sync.WaitGroup

    processDone:=make(chan struct{})

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    wg.Add(1)

    go saveN(ctx,&wg)

    wg.Add(1)

    go map(ctx,&wg)

    go signal()

    go func(){

        wg.Wait()

        close(processDone)

    }()

    select {

        case <-processDone:

        case <-sig:

            cancel()

            select {

                case <-processDone:

                case <-time.After(time.Second):

            }

    }

}

在最后的代碼中,正在傳遞等待組。例程負責調用wg.Done(),當所有例程完成后,processDone通道關閉,以發出選擇信號。


    func GoCountColumns(ctx context.Context, dst chan Result, src chan []string, wg *sync.WaitGroup) {

        defer wg.Done()

        defer close(dst)

        for {

            select {

            case <-ctx.Done():

                <-time.After(time.Minute) // do something more useful.

                return // quit. Notice the defer will be called.

            case data := <-src:

                dst <- countColumns(data) // some calculation function

            }

        }

    }

尚未確定哪種模式是首選,但您也可能會看到waitgroup僅在調用站點進行管理。


func main(){

    var wg sync.WaitGroup

    processDone:=make(chan struct{})

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    wg.Add(1)

    go func(){

        defer wg.Done()

        saveN(ctx)

    }()

    wg.Add(1)

    go func(){

        defer wg.Done()

        map(ctx)

    }()

    go signal()

    go func(){

        wg.Wait()

        close(processDone)

    }()

    select {

        case <-processDone:

        case <-sig:

            cancel()

            select {

                case <-processDone:

                case <-time.After(time.Second):

            }

    }

}

除了所有這些問題和 OP 問題之外,您必須始終預先評估并行處理對于給定任務的相關性。沒有獨特的秘訣,練習和衡量你的代碼性能。參見 pprof.


查看完整回答
反對 回復 2023-08-07
?
慕蓋茨4494581

TA貢獻1850條經驗 獲得超11個贊

這段代碼中發生的事情太多了。您應該將代碼重組為服務于特定目的的短函數,以便其他人可以輕松地幫助您(也可以幫助您自己)。

有多種方法可以讓一個 go-routine 等待其他工作完成。最常見的方法是使用等待組(我提供的示例)或通道。

func processSomething(...) {

? ? ...

}


func main() {

? ? workers := &sync.WaitGroup{}


? ? for i := 0; i < numWorkers; i++ {

? ? ? ? workers.Add(1) // you want to call this from the calling go-routine and before spawning the worker go-routine


? ? ? ? go func() {

? ? ? ? ? ? defer workers.Done() // you want to call this from the worker go-routine when the work is done (NOTE the defer, which ensures it is called no matter what)

? ? ? ? ? ? processSomething(....) // your async processing

? ? ? ? }()

? ? }


? ? // this will block until all workers have finished their work

? ? workers.Wait()

}


查看完整回答
反對 回復 2023-08-07
?
www說

TA貢獻1775條經驗 獲得超8個贊

您可以使用通道來阻塞,main直到 Goroutine 完成。


package main


import (

    "log"

    "time"

)


func main() {

    c := make(chan struct{})


    go func() {

        time.Sleep(3 * time.Second)

        log.Println("bye")

        close(c)

    }()


    // This blocks until the channel is closed by the routine

    <-c

}

無需向通道寫入任何內容。讀取會被阻塞,直到讀取數據或者我們在這里使用的通道關閉為止。


查看完整回答
反對 回復 2023-08-07
  • 3 回答
  • 0 關注
  • 191 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號