亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Go:學習通道和排隊,致命錯誤

Go:學習通道和排隊,致命錯誤

Go
aluckdog 2021-12-13 10:45:37
我正在嘗試學習如何使用通道在 Go 中為我的其他項目之一創建隊列。我的另一個項目基本上是對數據庫行進行排隊,然后使用行中的詳細信息對數據庫進行數字運算。我不希望工作人員同時處理同一行,因此它需要檢查工作人員當前是否正在處理該特定行 ID,如果是,則等待它完成。如果不是同一個row ID,可以異步運行,但是我也想限制可以同時運行的異步worker的數量。在我下面的代碼中,我目前試圖將其限制為三名工人。這是我所擁有的:package mainimport (    "log"    "strconv"    "time")// RowInfo holds the job infotype RowInfo struct {    id int}// WorkerCount holds how many workers are currently runningvar WorkerCount int// WorkerLocked specifies whether a row ID is currently processing by a workervar WorkerLocked map[string]bool// Process the RowInfofunc worker(row RowInfo) {    rowID := strconv.Itoa(row.id)    WorkerCount++    WorkerLocked[rowID] = true    time.Sleep(1 * time.Second)    log.Printf("ID rcvd: %d", row.id)    WorkerLocked[rowID] = false    WorkerCount--}// waiter will check if the row is already processing in a worker// Block until it finishes completion, then dispatchfunc waiter(row RowInfo) {    rowID := strconv.Itoa(row.id)    for WorkerLocked[rowID] == true {        time.Sleep(1 * time.Second)    }    go worker(row)}func main() {    jobsQueue := make(chan RowInfo, 10)    WorkerLocked = make(map[string]bool)    // Dispatcher waits for jobs on the channel and dispatches to waiter    go func() {        // Wait for a job        for {            // Only have a max of 3 workers running asynch at a time            for WorkerCount > 3 {                time.Sleep(1 * time.Second)            }            job := <-jobsQueue            go waiter(job)        }    }()    // Test the queue, send some data    for i := 0; i < 12; i++ {        r := RowInfo{            id: i,        }        jobsQueue <- r    }    // Prevent exit!    for {        time.Sleep(1 * time.Second)    }}
查看完整描述

2 回答

?
ABOUTYOU

TA貢獻1812條經驗 獲得超5個贊

如果您要使用WorkerLocked地圖,則需要使用sync包保護對其的訪問。您還需要以WorkerCount相同的方式(或使用原子操作)進行保護。這樣做也會使睡眠變得不必要(使用條件變量)。

更好的是,讓 3 個(或多個)工作人員等待行使用通道處理。然后,您會將行分配給各個工作人員,以便特定的工作人員始終處理特定的行(例如,使用 row.id % 3 來確定將行發送到哪個工作人員/通道)。


查看完整回答
反對 回復 2021-12-13
?
幕布斯6054654

TA貢獻1876條經驗 獲得超7個贊

我強烈建議不要在這種情況下使用任何鎖定,其中您有處理從數據庫讀取的工作人員。鎖和信號量一般會導致很多問題,最終會給你留下一堆損壞的數據。相信我。去過也做過。在這種情況下,您需要小心并避免使用它們。例如,如果您希望保留數據和維護地圖但不用于實際處理,則鎖定是很好的。

通過鎖定 go 例程,你會不必要地減慢你的 go 程序。Go 旨在盡可能快地處理事情。不要壓著他。

這是我自己的一些理論,可以幫助您更好地理解我想說的內容:

  • 為了處理工人限制為 3。只需生成 3 個不同的從隊列中選擇的 goroutine。Worker 永遠不會從頻道接受相同的工作,所以你在這里很安全。

  • make() 已經完成了內部通道限制,可以很好地在這種情況下使用。該通道限制是實際的第二個參數。所以如果你寫

    隊列 := make(chan RowInfo, 10)

    這意味著這個隊列最多可以占用10個 RowInfo。如果聚合到此隊列中的循環達到 10 個,它將鎖定并等待工作人員從通道中釋放一項。因此,一旦隊列達到 9,數據庫聚合器將寫入第 10 個,而 worker 將取出第 10 個。


通過這種方式,您可以擁有 golang 的自然工作流程:) 這也稱為生成pre-workers

package main


import (

    "fmt"

    "os"

    "os/signal"

    "syscall"

    "time"

)


// RowInfo holds the job info

type RowInfo struct {

    ID int

}


func worker(queue chan RowInfo, done chan bool) {

    fmt.Println("Starting worker...")


    for {

        select {

        case row := <-queue:

            fmt.Printf("Got row info: %v \n", row)

            // Keep it for second so we can see actual queue lock working

            time.Sleep(1 * time.Second)


        case <-time.After(10 * time.Second):

            fmt.Printf("This job is taking way too long. Let's clean it up now by lets say write write in database that job has failed so it can be restarted again when time is right.")

        case <-done:

            fmt.Printf("Got quit signal... Killing'em all")

            break

        }

    }

}


func handleSigterm(kill chan os.Signal, done chan bool) {

    select {

    case _ = <-kill:

        close(done)

    }

}


func main() {


    // Do not allow more than 10 records to be in the channel.

    queue := make(chan RowInfo, 10)

    done := make(chan bool)


    kill := make(chan os.Signal, 1)


    signal.Notify(kill, os.Interrupt)

    signal.Notify(kill, syscall.SIGTERM)


    go handleSigterm(kill, done)


    for i := 0; i < 3; i++ {

        go worker(queue, done)

    }


    // Should be infinite loop in the end...

    go func() {

        for i := 0; i < 100; i++ {

            fmt.Printf("Queueing: %v \n", i)

            queue <- RowInfo{ID: i}

        }

    }()


    <-done

    // Give it some time to process things before shutting down. This is bad way of doing things

    // but is efficient for this example

    time.Sleep(5 * time.Second)

}

至于管理數據庫狀態,您可以在數據庫中顯示“進行中”的狀態,因此每次選擇您時,也要對該行進行更新,以表明正在進行中。這當然是一種方法。通過在 golang 中保留某種映射,我會說你會比需要的更多地折磨你的服務。


查看完整回答
反對 回復 2021-12-13
  • 2 回答
  • 0 關注
  • 165 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號