亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

具有緩沖作業和固定輪詢間隔的工作池

具有緩沖作業和固定輪詢間隔的工作池

Go
呼喚遠方 2022-05-23 15:59:03
我有一個工作人員池在工作頻道上監聽,并在結果頻道上做出響應。作業生產者必須以固定的代碼間隔運行。在讀取足夠的新作業以填滿緩沖區之前,必須刷新結果。批量刷新結果和讀取新作業至關重要。請參閱下面的示例代碼,在這里的操場上運行它。是否可以在沒有原子計數器的情況下重寫它來跟蹤飛行作業?
查看完整描述

1 回答

?
開滿天機

TA貢獻1786條經驗 獲得超13個贊

這是您的代碼的基于通道的版本,在功能上等同于上面示例的意圖。關鍵點是我們沒有使用任何原子值來改變代碼的邏輯,因為這不提供 goroutine 之間的同步。goroutine 之間的所有交互都使用通道sync.WaitGroup、 或同步context.Context??赡苡懈玫姆椒▉斫鉀Q手頭的問題,但這表明沒有必要的原子來協調隊列和工作人員。


這里唯一在 goroutines 之間仍然不協調的值是len(jobs)在日志輸出中的使用。使用它是否有意義取決于你,因為它的值在并發世界中是沒有意義的,但它是安全的,因為它是為并發使用而同步的,并且沒有基于該值的邏輯。


buf := 5

workers := 3

jobs := make(chan int, buf)


// results buffer must always be larger than workers + buf to prevent deadlock

results := make(chan int, buf*2)


ctx, cancel := context.WithCancel(context.Background())

defer cancel()


// Start workers

var wg sync.WaitGroup

for n := 0; n < workers; n++ {

    wg.Add(1)

    go func(n int) {

        defer wg.Done()

        for jobID := range jobs {

            fmt.Printf("worker %v processing %v - %v jobs left\n", n, jobID, len(jobs))

            time.Sleep(time.Duration(rand.Intn(5)) * pollInterval)

            results <- jobID

        }

        fmt.Printf("worker %v exited", n)

    }(n)

}


var done sync.WaitGroup

done.Add(1)

go func() {

    defer done.Done()

    ticker := time.NewTicker(pollInterval)

    r := make([]string, 0)


    flushResults := func() {

        fmt.Printf("===> results: %v\n", strings.Join(r, ","))

        r = r[:0]

    }


    for {

        select {

        case <-ticker.C:

            flushResults()


            // send max buf jobs, or fill the queue

            for i := 0; i < buf; i++ {

                jobID++

                select {

                case jobs <- jobID:

                    continue

                }

                break

            }

            fmt.Printf("===> send %v jobs\n", i)


        case jobID := <-results:

            r = append(r, fmt.Sprintf("%v", jobID))


        case <-ctx.Done():

            // Close jobs channel to stop workers

            close(jobs)

            // Wait for workers to exit

            wg.Wait()


            // we can close results for easy iteration because we know

            // there are no more workers.

            close(results)

            // Flush remaining results

            for jobID := range results {

                r = append(r, fmt.Sprintf("%v", jobID))

            }

            flushResults()

            return

        }

    }

}()


查看完整回答
反對 回復 2022-05-23
  • 1 回答
  • 0 關注
  • 129 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號