亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在順序執行之前等待通道中的 N 個項目

在順序執行之前等待通道中的 N 個項目

Go
慕哥9229398 2023-05-08 15:49:57
所以我很新去!但是我對我想嘗試的事情有這個想法。我想要一個從通道接受字符串的 go 例程,但只有在它收到 N 個字符串后才應該對它們執行。我四處尋找類似的問題或案例,但我只發現了一些想法是并行執行多個例程并等待匯總結果。我想到了創建一個數組并將其傳遞給長度足夠的例程的想法。但是我想保持一定的關注點分離并在接收端控制它。我的問題是。這是出于某種原因的不良做法嗎?有沒有更好的方法來做到這一點,它是什么?func main() {    ch := make(chan string)    go func() {        tasks := []string{}        for {            tasks = append(tasks,<- ch)            if len(tasks) < 3 {                fmt.Println("Queue still to small")            }            if len(tasks) > 3 {                for i := 0; i < len(tasks); i++ {                    fmt.Println(tasks[i])                }            }        }    }()    ch <- "Msg 1"    time.Sleep(time.Second)    ch <- "Msg 2"    time.Sleep(time.Second)    ch <- "Msg 3"    time.Sleep(time.Second)    ch <- "Msg 4"    time.Sleep(time.Second)}編輯更簡單更準確的例子。
查看完整描述

2 回答

?
慕娘9325324

TA貢獻1783條經驗 獲得超4個贊

根據一些評論,您正在尋找的似乎是某種形式的批處理。

批處理有幾種情況,當您想要獲取批處理并將其一起發送時:

  1. 批量大小足夠

  2. 已經過了足夠的時間,應該沖洗部分批次

您給出的示例不考慮第二種情況。如果您只是因為停止加載而從不沖水,這可能會導致一些尷尬的行為。

因此,我建議要么查看庫(例如,cloudfoundry/go-batching),要么簡單地使用通道、計時器和選擇語句。

package main


import (

? ? "fmt"

? ? "time"

)


func main() {

? ? ch := make(chan string)

? ? go func() {

? ? ? ? tasks := []string{}

? ? ? ? timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience

? ? ? ? for {

? ? ? ? ? ? select {

? ? ? ? ? ? case <-timer.C:

? ? ? ? ? ? ? ? fmt.Println("Flush partial batch due to time")

? ? ? ? ? ? ? ? flush(tasks)

? ? ? ? ? ? ? ? tasks = nil

? ? ? ? ? ? ? ? timer.Reset(time.Second)

? ? ? ? ? ? case data := <-ch:

? ? ? ? ? ? ? ? tasks = append(tasks, data)


? ? ? ? ? ? ? ? // Reset the timer for each data point so that we only flush

? ? ? ? ? ? ? ? // partial batches when we stop receiving data.

? ? ? ? ? ? ? ? if !timer.Stop() {

? ? ? ? ? ? ? ? ? ? <-timer.C

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? timer.Reset(time.Second)


? ? ? ? ? ? ? ? // Guard clause to for batch size

? ? ? ? ? ? ? ? if len(tasks) < 3 {

? ? ? ? ? ? ? ? ? ? fmt.Println("Queue still too small")

? ? ? ? ? ? ? ? ? ? continue

? ? ? ? ? ? ? ? }


? ? ? ? ? ? ? ? flush(tasks)

? ? ? ? ? ? ? ? tasks = nil // reset tasks

? ? ? ? ? ? }

? ? ? ? }

? ? }()


? ? ch <- "Msg 1"

? ? time.Sleep(time.Second)

? ? ch <- "Msg 2"

? ? time.Sleep(time.Second)

? ? ch <- "Msg 3"

? ? time.Sleep(time.Second)

? ? ch <- "Msg 4"

? ? time.Sleep(time.Second)

}


func flush(tasks []string) {

? ? // Guard against emtpy flushes

? ? if len(tasks) == 0 {

? ? ? ? return

? ? }


? ? fmt.Println("Flush")

? ? for _, t := range tasks {

? ? ? ? fmt.Println(t)

? ? }

}


查看完整回答
反對 回復 2023-05-08
?
蕭十郎

TA貢獻1815條經驗 獲得超13個贊

我可以看到批處理結果的東西是如何有用的。但它確實需要定制解決方案。有很多方法可以解決這個問題——我試過使用Sync.WaitGroup但它變得很亂。似乎使用 async.Mutex來鎖定批處理功能是最好的方法。但是,當 mutex 是最好的答案時,imo 應該觸發對設計的重新檢查,因為 imo,它應該是最后一個選項。


package main


import (

    "context"

    "fmt"

    "sync"

    "sync/atomic"

)


func main() {


    ctx, canc := context.WithCancel(context.Background())

    acc := NewAccumulator(4, ctx)

    go func() {

        for i := 0; i < 10; i++ {

            acc.Write("hi")

        }

        canc()

    }()


    read := acc.ReadChan()

    for batch := range read {

        fmt.Println(batch)

    }

    fmt.Println("done")

}


type Accumulator struct {

    count    int64

    size     int

    in       chan string

    out      chan []string

    ctx      context.Context

    doneFlag int64

    mu   sync.Mutex

}


func NewAccumulator(size int, parentCtx context.Context) *Accumulator {

    a := &Accumulator{

        size: size,

        in:   make(chan string, size),

        out:  make(chan []string, 1),

        ctx:  parentCtx,

    }


    go func() {

        <-a.ctx.Done()

        atomic.AddInt64(&a.doneFlag, 1)

        close(a.in)

        a.mu.Lock()

        a.batch()

        a.mu.Unlock()

        close(a.out)

    }()

    return a

}


func (a *Accumulator) Write(s string) {

    if atomic.LoadInt64(&a.doneFlag) > 0 {

        panic("write to closed accumulator")

    }

    a.in <- s

    atomic.AddInt64(&a.count, 1)

    a.mu.Lock()

    if atomic.LoadInt64(&a.count) == int64(a.size) {

        a.batch()

    }

    a.mu.Unlock()

}


func (a *Accumulator) batch() {

    batch := make([]string, 0)

    for i := 0; i < a.size; i++ {

        msg := <-a.in

        if msg != "" {

            batch = append(batch, msg)

        }

    }

    fmt.Println("batching", batch)

    a.out <- batch

    atomic.StoreInt64(&a.count, 0)

}


func (a *Accumulator) ReadChan() <-chan []string {

    return a.out

}

最好只擁有一個累積字符串的切片,當該切片達到一定大小時,然后開始一些處理。


查看完整回答
反對 回復 2023-05-08
  • 2 回答
  • 0 關注
  • 143 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號