亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

golang生產者消費者接收到的消息數

golang生產者消費者接收到的消息數

Go
開滿天機 2022-10-24 16:06:26
我在 golang 中編寫了生產者-消費者模式。讀取多個 csv 文件并處理記錄。我正在一口氣讀取 csv 文件的所有記錄。我想以包括所有 csv 文件在內的總記錄的 5% 的間隔記錄處理完成的百分比。例如,我有 3 個 csv 要處理,每個有 20、30、50 行/記錄(因此總共要處理 100 條記錄)想要在處理 5 條記錄時記錄進度。func processData(inputCSVFiles []string) {    producerCount := len(inputCSVFiles)    consumerCount := producerCount    link := make(chan []string, 100)    wp := &sync.WaitGroup{}    wc := &sync.WaitGroup{}    wp.Add(producerCount)    wc.Add(consumerCount)    for i := 0; i < producerCount; i++ {        go produce(link, inputCSVFiles[i], wp)    }    for i := 0; i < consumerCount; i++ {        go consume(link, wc)    }    wp.Wait()    close(link)    wc.Wait()    fmt.Println("Completed data migration process for all CSV data files.")}func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {    defer wg.Done()    records := readCsvFile(filePath)    totalNumberOfRecords := len(records)    for _, record := range records {        link <- record    }}func consume(link <-chan []string, wg *sync.WaitGroup) {    defer wg.Done()    for record := range link {        // process csv record    }}
查看完整描述

1 回答

?
浮云間

TA貢獻1829條經驗 獲得超4個贊

我使用了原子變量和計數器通道,其中消費者將在處理記錄時推送計數,其他 goroutine 將從通道中讀取并計算總處理記錄百分比。


var progressPercentageStep float64 = 5.0

var totalRecordsToProcess int32


func processData(inputCSVFiles []string) {

        producerCount := len(inputCSVFiles)

        consumerCount := producerCount

        link := make(chan []string, 100)

        counter := make(chan int, 100)

        defer close(counter)

        wp := &sync.WaitGroup{}

        wc := &sync.WaitGroup{}

    

        wp.Add(producerCount)

        wc.Add(consumerCount)

    

        for i := 0; i < producerCount; i++ {

            go produce(link, inputCSVFiles[i], wp)

        }


        go progressStats(counter)


        for i := 0; i < consumerCount; i++ {

            go consume(link, wc)

        }

        wp.Wait()

        close(link)

        wc.Wait()

        

    }

    

    func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {

        defer wg.Done()

        records := readCsvFile(filePath)

        atomic.AddInt32(&totalRecordsToProcess, int32(len(records)))

        for _, record := range records {

            link <- record

        }

    }

    

    func consume(link <-chan []string,counter chan<- int, wg *sync.WaitGroup) {

        defer wg.Done()

        for record := range link {

            // process csv record

            counter <- 1

        }

    }

    

func progressStats(counter <-chan int) {

    var feedbackThreshold = progressPercentageStep

    for count := range counter {

        totalRemaining := atomic.AddInt32(&totalRecordsToProcess, -count)

        donePercent := 100.0 * processed / totalRemaining

        // log progress

        if donePercent >= feedbackThreshold {

            log.Printf("Progress ************** Total Records: %d, Processed Records : %d, Processed Percentage: %.2f **************\n", totalRecordsToProcess, processed, donePercent)

            feedbackThreshold += progressPercentageStep

        }

    }

}


查看完整回答
反對 回復 2022-10-24
  • 1 回答
  • 0 關注
  • 122 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號