亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何處理可以無阻塞增長的隊列

如何處理可以無阻塞增長的隊列

Go
郎朗坤 2023-03-21 16:05:15
如果隊列可以從處理函數本身增長,我試圖了解如何在 Go 中處理隊列。請參閱下面的代碼。在此偽代碼中,我想將我創建的處理程序數量限制為 10。因此我創建了 10 個處理隊列的處理程序。然后我用一個 url 開始排隊。我的問題是,根據文檔,sender通道將阻塞,直到接收方接收到數據。在下面的代碼中,每個進程都是一個處理新 url 的接收器。然而,很容易看出,如果一個進程向隊列發送 11 個鏈接,它將阻塞,直到所有接收者都處理完這些新鏈接。如果這些接收者每個都有 1 個鏈接,那么它們也會在將新的 1 個鏈接發送到隊列時阻塞。由于每個人都被阻止,所以什么都沒有完成。我想知道 go 的一般解決方案是什么,用于處理可以從進程本身增長的隊列。請注意,我認為我可以通過鎖定名為 的數組來執行此操作queue,但我正在嘗試了解如何使用通道來完成此操作。var queue = make(chan string)func process(){    for currentURL := range queue {        links, _ := ... // some http call that gets links from a url        for _, link := links {            queue <- link        }    }}func main () {   for i :=0; i < 10; i++ {        go process()   }   queue <- "https://stackoverflow.com"   ...   // block until receive some quit message   <-quit }
查看完整描述

2 回答

?
拉丁的傳說

TA貢獻1789條經驗 獲得超8個贊

您可以使用的一種簡單方法是將用于將鏈接添加到頻道的代碼移動到它自己的 go 例程中。這樣,您的主要處理可以繼續,而阻塞的通道寫入將阻塞一個單獨的 go 例程。


func process(){

    for currentURL := range queue {

        links, _ := ... // some http call that gets links from a url

        for _, link := links {

            l := link // this is important! ...

            // the loop will re-set the value of link before the go routine is started


            go func(l) {

                queue <- link // we'll be blocked here...

                // but the "parent" routine can still iterate through the channel

                // which in turn un-blocks the write

            }(l)

        }

    }

}

使用信號量示例編輯以限制 go 例程:


func main () {

    maxWorkers := 5000

    sem := semaphore.NewWeighted(int64(maxWorkers))

    ctx := context.TODO()

    for i :=0; i < 10; i++ {

        go process(ctx)

    }


    queue <- "https://stackoverflow.com"

    // block until receive some quit message

    <-quit 

}


func process(ctx context.Context){

    for currentURL := range queue {

        links, _ := ... // some http call that gets links from a url

        for _, link := links {

            l := link // this is important! ...

            // the loop will re-set the value of link before the go routine is started


            // acquire a go routine...

            // if we are at the routine limit, this line will block until one becomes available

            sem.Acquire(ctx, 1)

            go func(l) {

                defer sem.Release(1)

                queue <- link // we'll be blocked here...

                // but the "parent" routine can still iterate through the channel

                // which in turn un-blocks the write

            }(l)

        }

    }

}

但是這個選項最終可能會導致死鎖...假設所有的 go 例程都已聲明,父循環可能會被鎖定在sem.Acquire. 這將導致子例程永遠不會添加到通道中,因此永遠不會執行 deferred sem.Release。在我的腦海中,我正在努力想出一個很好的方法來處理這個問題。也許是外部內存隊列而不是通道?


查看完整回答
反對 回復 2023-03-21
?
弒天下

TA貢獻1818條經驗 獲得超8個贊

有兩件事你可以做,要么使用緩沖通道不阻塞,即使另一端沒有人接收。這樣您就可以立即刷新通道內的值。


一種更有效的方法是檢查通道中是否有任何可用值,或者通道是否關閉,這應該由發送方在發送所有值時關閉。


接收者可以通過為接收表達式分配第二個參數來測試通道是否已關閉。


v, ok := <-ch 

ok如果false沒有更多的值可以接收并且通道關閉。使用 select as 檢查通道內的值


package main


import (

    "fmt"

    "sync"

)


var queue = make(chan int)

var wg sync.WaitGroup


func process(){

        values := []int{1,2,5,3,9,7}

        for _, value := range values {

            queue <- value        

        }

}


func main () {

   for i :=0; i < 10; i++ {

        go process()

   }

   wg.Add(1)

   go func(){

      defer wg.Done()

      for j:=0;j<30;j++ {

          select {

             case <-queue:

        fmt.Println(<-queue)

          } 

      }

   }()

   wg.Wait()

   close(queue)

}


查看完整回答
反對 回復 2023-03-21
  • 2 回答
  • 0 關注
  • 103 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號