亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

生產者速度慢,消費者速度快的情況下如何處理通道關閉同步?

生產者速度慢,消費者速度快的情況下如何處理通道關閉同步?

Go
茅侃侃 2023-03-29 15:38:56
我是新手,找不到這個問題的答案。我正在做的是在生產者中讀取 CSV 文件,做一些可能需要時間的事情,然后通過通道將輸出發送給消費者。有一個生產者-消費者鏈,任何生產者最終都可能比它的消費者慢。producer(1 goroutine) -> chan0 -> consumer-producer-1(>1 goroutines) -> chan1 -> consumer-producer-2(>1 goroutines) -> chan2 -> consumer(>1 goroutines)這里最多可以有 15 個消費者?,F在我面臨的問題是,如果生產者完成了,如何在消費者方面做出決定,我們可以停止處理。我需要實現的是:一旦生產者完成,所有消費者最終應該做一些清理并在完成剩余的后退出如果消費者在特定的超時期限內沒有獲得任何數據,它可以退出(最好有一個信號)而不會進一步阻塞。它發生在序列中的所有生產者-消費者對上。我使用了以下方法。將信號通道與每個數據通道保持在一起,并為其下一個消費者的每個 goroutine 發布“完成”。讀取后,每個消費者應該只讀取通道中剩余的緩沖數據,然后將 5“完成”放在下一個信號通道上。確保每個 goroutine 只有 5 個,而不是 5 個(使用https://golang.org/pkg/sync/#Once.Do)。以下是我到這里為止能想到的。processRemaining = falsefor processRemaining == false{        select {        case stuff, ok := <-input_messages:                do_stuff(stuff)                if ok == false { // if channel has been closed                    processRemaining = true                }                if result != nil {                        //send to channel output_messages                }        case sig := <-input_signals: // if signaled to stopped.                fmt.Println("received signal", sig)                processRemaining = true        default:                fmt.Println("no activity")        }}if processRemaining {        for stuff := range input_messages {                do_stuff(stuff)                if result != nil {                        //send to channel output_messages                }        }        // send "output_routine" number of "done" to a channel "output_signals".}但即使在這種方法中,我也無法想出任何方式來表現與關閉的“input_messages”通道相同的方式,如果沒有可用的時間,比如 10 秒。這種方法有沒有我忽略的問題。解決這個問題的可能方法(或并發模式)是什么?確保:一旦第一個“chan0”關閉,所有后續通道都會關閉。所有的生產者在關閉他們的輸出通道之前都會被更新,只有當他們都完成了他們的寫入時,通道才會關閉。如果消費者在指定的超時時間內沒有從通道中獲取數據,它應該將其視為已關閉,并自行解除阻塞。
查看完整描述

1 回答

?
翻翻過去那場雪

TA貢獻2065條經驗 獲得超14個贊

使用 async.WaitGroup來跟蹤正在運行的 goroutines 的數量。每個 goroutine 在不再從通道獲取數據后退出。一旦WaitGroup完成,清理就可以完成了。


是這樣的:


import (

        "sync"

        "time"

)


type Data interface{} // just an example


type Consumer interface {

        Consume(Data) Data

        CleanUp()

        Count() int

        Timeout() time.Duration

}


func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {

        wg := sync.WaitGroup{}

        for i := 0; i < consumer.Count(); i++ {

                wg.Add(1)

                go func() {

                consumeLoop:

                        for {

                                select {

                                case v, ok := <-inCh: // 'ok' says if the channel is still open

                                        if !ok {

                                                break consumeLoop

                                        }

                                        outCh <- consumer.Consume(v)

                                case <-time.After(consumer.Timeout()):

                                        break consumeLoop

                                }

                        }


                        wg.Done()

                }()

        }

        wg.Wait()


        consumer.CleanUp()

        close(outCh)

}

在管道的每個階段,您都可以使用與上述類似的過程來啟動消費者。


查看完整回答
反對 回復 2023-03-29
  • 1 回答
  • 0 關注
  • 116 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號