亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

關閉和發送到通道之間的競爭條件

關閉和發送到通道之間的競爭條件

Go
一只甜甜圈 2023-08-14 17:17:21
我正在嘗試使用工作池構建通用管道庫。我為源、管道和接收器創建了一個接口。您會看到,管道的工作是從輸入通道接收數據,對其進行處理,然后將結果輸出到通道上。這是它的預期行為:從輸入通道接收數據。將數據委托給可用的工作人員。Worker 將結果發送到輸出通道。所有工作人員完成后關閉輸出通道。func (p *pipe) Process(in chan interface{}) (out chan interface{}) {    var wg sync.WaitGroup    out = make(chan interface{}, 100)    go func() {        for i := 1; i <= 100; i++ {            go p.work(in, out, &wg)        }        wg.Wait()        close(out)    }()    return}func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {    for j := range jobs {        func(j Job) {            defer wg.Done()            wg.Add(1)            res := doSomethingWith(j)            out <- res        }(j)    }}但是,運行它可能會退出而不處理所有輸入,或者出現錯誤并顯示send on closed channel消息。使用該標志構建源會在和-race之間發出數據爭用警告。close(out)out <- res我認為可能會發生以下情況。一旦一些工人完成了工作,wg計數器就會瞬間歸零。因此,wg.Wait()完成并且程序繼續進行close(out)。與此同時,作業通道尚未完成數據生成,這意味著一些工作人員仍在另一個 goroutine 中運行。由于out通道已經關閉,因此會導致恐慌。等待組應該放在其他地方嗎?或者有沒有更好的方法來等待所有工人完成?
查看完整描述

2 回答

?
撒科打諢

TA貢獻1934條經驗 獲得超2個贊

目前尚不清楚為什么每個工作需要一名工作人員,但如果您這樣做,您可以重組您的外循環設置(請參閱下面未經測試的代碼)。這首先就消除了對工作池的需要。


不過,在解雇任何員工之前,請務必wg.Add 先執行此操作。在這里,您正好剝離了 100 名員工:


var wg sync.WaitGroup

out = make(chan interface{}, 100)

go func() {

    for i := 1; i <= 100; i++ {

        go p.work(in, out, &wg)

    }

    wg.Wait()

    close(out)

}()

因此,您可以這樣做:


var wg sync.WaitGroup

out = make(chan interface{}, 100)

go func() {

    wg.Add(100)  // ADDED - count the 100 workers

    for i := 1; i <= 100; i++ {

        go p.work(in, out, &wg)

    }

    wg.Wait()

    close(out)

}()

請注意,您現在可以將wg其自身移至派生出工作線程的 goroutine 中。如果你放棄讓每個工人將工作分拆為新的 goroutine 的想法,這可以讓事情變得更干凈。但是,如果每個工作人員要派生另一個 goroutine,則該工作人員本身也必須使用wg.Add,如下所示:


for j := range jobs {

    wg.Add(1)  // ADDED - count the spun-off goroutines

    func(j Job) {

        res := doSomethingWith(j)


        out <- res

        wg.Done()  // MOVED (for illustration only, can defer as before)

    }(j)

}

wg.Done() // ADDED - our work in `p.work` is now done

wg.Add(1)也就是說,每個匿名函數都是通道的另一個用戶,因此在分拆新的 goroutine 之前增加通道用戶計數 ( )。當您讀完輸入通道后jobs,調用wg.Done()(可能通過較早的defer,但我在此處的末尾展示了它)。


思考這個問題的關鍵是計算此時可以wg寫入通道的活動 goroutine 的數量。只有當沒有goroutine 打算再寫入時,它才會變為零。 這使得關閉通道是安全的。


考慮使用相當簡單的(但未經測試):


func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    out = make(chan interface{})

    var wg sync.WaitGroup

    go func() {

        defer close(out)

        for j := range in {

            wg.Add(1)

            go func(j Job) {

                res := doSomethingWith(j)

                out <- res

                wg.Done()

            }(j)

        }

        wg.Wait()

    }()

    return out

}

現在,您有一個 goroutine 正在in以最快的速度讀取通道,并在讀取過程中分拆作業。每項傳入的工作都會獲得一個 goroutine,除非他們提前完成工作。沒有池,每個工作只有一名工人(與您的代碼相同,只是我們淘汰了沒有做任何有用事情的池)。


或者,由于只有一定數量的 CPU 可用,請像之前在開始時所做的那樣分拆一定數量的 goroutine,但讓每個 goroutine 運行一個作業直至完成,并交付其結果,然后返回讀取下一個作業:


func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    out = make(chan interface{})

    go func() {

        defer close(out)

        var wg sync.WaitGroup

        ncpu := runtime.NumCPU()  // or something fancier if you like

        wg.Add(ncpu)

        for i := 0; i < ncpu; i++ {

            go func() {

                defer wg.Done()

                for j := range in {

                    out <- doSomethingWith(j)

                }

            }()

        }

        wg.Wait()

    }

    return out

}

通過使用,runtime.NumCPU()我們只能獲得與運行作業的 CPU 一樣多的工作線程來讀取作業。這些是池,它們一次只做一項工作。


如果輸出通道讀取器結構良好(即不會導致管道阻塞),通常不需要緩沖輸出通道。如果不是,這里的緩沖深度會限制您可以“領先于”正在使用結果的人的工作數量。根據“提前工作”的有用程度來設置它,不一定是 CPU 數量、預期作業數量等。


查看完整回答
反對 回復 2023-08-14
?
一只萌萌小番薯

TA貢獻1795條經驗 獲得超7個贊

作業的完成速度可能與發送的速度一樣快。在這種情況下,即使有更多的項目需要處理,WaitGroup 也會在零附近浮動。


解決此問題的一種方法是在發送作業之前添加一項,并在發送所有作業后減少該作業,從而有效地將發送者視為“作業”之一。在這種情況下,我們最好在wg.Add發送方中執行以下操作:


func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    var wg sync.WaitGroup

    out = make(chan interface{}, 100)

    go func() {

        for i := 1; i <= 100; i++ {

            wg.Add(1)

            go p.work(in, out, &wg)

        }

        wg.Wait()

        close(out)

    }()


    return

}


func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {

    for j := range jobs {

        func(j Job) {

            res := doSomethingWith(j)


            out <- res

            wg.Done()

        }(j)

    }

}

我在代碼中注意到的一件事是,每個作業都會啟動一個 goroutine。同時,每個作業jobs循環處理通道,直到清空/關閉。似乎沒有必要兩者都做。


查看完整回答
反對 回復 2023-08-14
  • 2 回答
  • 0 關注
  • 203 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號