亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

潛在遞歸任務的工作池(即,每個作業可以排隊其他作業)

潛在遞歸任務的工作池(即,每個作業可以排隊其他作業)

Go
慕尼黑5688855 2021-09-27 21:01:16
我正在編寫一個應用程序,用戶可以從多個“作業”(實際上是 URL)開始。在開始(主程序)時,我將這些 URL 添加到隊列中,然后啟動 x 處理這些 URL 的 goroutine。在特殊情況下,URL 指向的資源可能包含更多必須添加到隊列中的 URL。這 3 名工人正在等待新的工作進入并處理它們。問題是:一旦每個工人都在等待工作(并且沒有人生產任何工作),工人就應該完全停止。所以要么所有這些都有效,要么沒有一個有效。我當前的實現看起來像這樣,我認為它不優雅。不幸的是,我想不出一個更好的方法來不包含競爭條件,而且我不完全確定這個實現是否真的按預期工作:var queue // from somewhereconst WORKER_COUNT = 3var done chan struct{}func work(working chan int) {  absent := make(chan struct{}, 1)  // if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.  // This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.  one := false  for {    select {    case u, ok := <-queue.Pop():      if !ok {        close(absent)        return      }      if !one {        // I have started working (delta + 1)        working <- 1        absent <- struct{}{}        one = true      }      // do work with u (which may lead to queue.Push(urls...))    case <-absent: // no jobs at the moment. consume absent => wait      one = false      working <- -1    }  }}func Start() {  working := make(chan int)  for i := 0; i < WORKER_COUNT; i++ {    go work(working)  }  // the amount of actually working workers...  sum := 0  for {    delta := <-working    sum += delta    if sum == 0 {      queue.Close() // close channel -> kill workers.      done <- struct{}{}      return    }  }}有沒有更好的方法來解決這個問題?
查看完整描述

1 回答

?
德瑪西亞99

TA貢獻1770條經驗 獲得超3個贊

您可以使用 sync.WaitGroup(請參閱文檔)來控制工作人員的生命周期,并使用非阻塞發送,以便工作人員在嘗試排隊更多作業時不會死鎖:


package main


import "sync"


const workers = 4


type job struct{}


func (j *job) do(enqueue func(job)) {

    // do the job, calling enqueue() for subtasks as needed

}


func main() {

    jobs, wg := make(chan job), new(sync.WaitGroup)

    var enqueue func(job)


    // workers

    for i := 0; i < workers; i++ {

        go func() {

            for j := range jobs {

                j.do(enqueue)

                wg.Done()

            }

        }()

    }


    // how to queue a job

    enqueue = func(j job) {

        wg.Add(1)

        select {

        case jobs <- j: // another worker took it

        default: // no free worker; do the job now

            j.do(enqueue)

            wg.Done()

        }

    }


    todo := make([]job, 1000)

    for _, j := range todo {

        enqueue(j)

    }

    wg.Wait()

    close(jobs)

}

嘗試使用緩沖通道避免死鎖的困難在于,您必須預先分配一個足夠大的通道,以確保在不阻塞的情況下保持所有掛起的任務。除非您有少量已知的 URL 可供抓取,否則會出現問題。


當您回退到在當前線程中進行普通遞歸時,您沒有那個靜態緩沖區大小限制。當然,仍然存在限制:如果有太多工作待處理,您可能會耗盡 RAM,理論上您可以通過深度遞歸耗盡堆棧(但這很難?。?。因此,如果您要對整個 Web 進行爬行,則需要以更復雜的方式跟蹤待處理的任務。


最后,作為一個更完整的例子,我對這段代碼并不感到非常自豪,但我碰巧寫了一個函數來啟動一個并行排序,它以與獲取 URL 的方式相同的方式遞歸。


查看完整回答
反對 回復 2021-09-27
  • 1 回答
  • 0 關注
  • 197 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號