2 回答

TA貢獻1783條經驗 獲得超4個贊
根據一些評論,您正在尋找的似乎是某種形式的批處理。
批處理有幾種情況,當您想要獲取批處理并將其一起發送時:
批量大小足夠
已經過了足夠的時間,應該沖洗部分批次
您給出的示例不考慮第二種情況。如果您只是因為停止加載而從不沖水,這可能會導致一些尷尬的行為。
因此,我建議要么查看庫(例如,cloudfoundry/go-batching),要么簡單地使用通道、計時器和選擇語句。
package main
import (
? ? "fmt"
? ? "time"
)
func main() {
? ? ch := make(chan string)
? ? go func() {
? ? ? ? tasks := []string{}
? ? ? ? timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
? ? ? ? for {
? ? ? ? ? ? select {
? ? ? ? ? ? case <-timer.C:
? ? ? ? ? ? ? ? fmt.Println("Flush partial batch due to time")
? ? ? ? ? ? ? ? flush(tasks)
? ? ? ? ? ? ? ? tasks = nil
? ? ? ? ? ? ? ? timer.Reset(time.Second)
? ? ? ? ? ? case data := <-ch:
? ? ? ? ? ? ? ? tasks = append(tasks, data)
? ? ? ? ? ? ? ? // Reset the timer for each data point so that we only flush
? ? ? ? ? ? ? ? // partial batches when we stop receiving data.
? ? ? ? ? ? ? ? if !timer.Stop() {
? ? ? ? ? ? ? ? ? ? <-timer.C
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? timer.Reset(time.Second)
? ? ? ? ? ? ? ? // Guard clause to for batch size
? ? ? ? ? ? ? ? if len(tasks) < 3 {
? ? ? ? ? ? ? ? ? ? fmt.Println("Queue still too small")
? ? ? ? ? ? ? ? ? ? continue
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? flush(tasks)
? ? ? ? ? ? ? ? tasks = nil // reset tasks
? ? ? ? ? ? }
? ? ? ? }
? ? }()
? ? ch <- "Msg 1"
? ? time.Sleep(time.Second)
? ? ch <- "Msg 2"
? ? time.Sleep(time.Second)
? ? ch <- "Msg 3"
? ? time.Sleep(time.Second)
? ? ch <- "Msg 4"
? ? time.Sleep(time.Second)
}
func flush(tasks []string) {
? ? // Guard against emtpy flushes
? ? if len(tasks) == 0 {
? ? ? ? return
? ? }
? ? fmt.Println("Flush")
? ? for _, t := range tasks {
? ? ? ? fmt.Println(t)
? ? }
}

TA貢獻1815條經驗 獲得超13個贊
我可以看到批處理結果的東西是如何有用的。但它確實需要定制解決方案。有很多方法可以解決這個問題——我試過使用Sync.WaitGroup但它變得很亂。似乎使用 async.Mutex來鎖定批處理功能是最好的方法。但是,當 mutex 是最好的答案時,imo 應該觸發對設計的重新檢查,因為 imo,它應該是最后一個選項。
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
func main() {
ctx, canc := context.WithCancel(context.Background())
acc := NewAccumulator(4, ctx)
go func() {
for i := 0; i < 10; i++ {
acc.Write("hi")
}
canc()
}()
read := acc.ReadChan()
for batch := range read {
fmt.Println(batch)
}
fmt.Println("done")
}
type Accumulator struct {
count int64
size int
in chan string
out chan []string
ctx context.Context
doneFlag int64
mu sync.Mutex
}
func NewAccumulator(size int, parentCtx context.Context) *Accumulator {
a := &Accumulator{
size: size,
in: make(chan string, size),
out: make(chan []string, 1),
ctx: parentCtx,
}
go func() {
<-a.ctx.Done()
atomic.AddInt64(&a.doneFlag, 1)
close(a.in)
a.mu.Lock()
a.batch()
a.mu.Unlock()
close(a.out)
}()
return a
}
func (a *Accumulator) Write(s string) {
if atomic.LoadInt64(&a.doneFlag) > 0 {
panic("write to closed accumulator")
}
a.in <- s
atomic.AddInt64(&a.count, 1)
a.mu.Lock()
if atomic.LoadInt64(&a.count) == int64(a.size) {
a.batch()
}
a.mu.Unlock()
}
func (a *Accumulator) batch() {
batch := make([]string, 0)
for i := 0; i < a.size; i++ {
msg := <-a.in
if msg != "" {
batch = append(batch, msg)
}
}
fmt.Println("batching", batch)
a.out <- batch
atomic.StoreInt64(&a.count, 0)
}
func (a *Accumulator) ReadChan() <-chan []string {
return a.out
}
最好只擁有一個累積字符串的切片,當該切片達到一定大小時,然后開始一些處理。
- 2 回答
- 0 關注
- 143 瀏覽
添加回答
舉報