亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何同時從速率受限的 API 端點進行提?。?/h1>

如何同時從速率受限的 API 端點進行提?。?/h1>
Go
海綿寶寶撒 2022-08-01 15:24:37
我不能把我的頭纏繞在這個問題上。我有一個需要從中提取數據的服務,該服務的速率限制為每秒5個請求,即使在使用包并在每個請求之前設置和調用它時,它有時仍會達到速率限制,我需要退出發送請求。我可能與另一個可能干擾請求預算的服務競爭,因此我想更好地處理它。x/rate/limitrate.Limiter(5,1)我的問題是我需要解決這個問題,我一次處理5個請求,但是當一個請求達到速率限制時,下一個請求也是如此,服務器有時會增加我在發送另一個請求之前必須等待的時間。因此,如果有5個請求發出,如果一個達到速率限制,則其余請求也達到速率限制并且會卡住的可能性更大。如何有效地解決此問題?我需要通過將受速率限制的請求反饋給工作人員來重新處理這些請求。當我達到速率限制時,我正在嘗試停止所有工作線程,在給定的延遲后退,然后繼續處理請求。以下是我擁有的一些模擬代碼示例:package mainimport (    "context"    "log"    "net/http"    "strconv"    "sync"    "time"    "golang.org/x/time/rate")// Rate-limit => 5 req/sconst (    workers = 5)func main() {    ctx, cancel := context.WithCancel(context.Background())    // Mock function to grab all the serials to use in upcoming requests.    serials, err := getAllSerials(ctx)    if err != nil {        panic(err)    }    // Set up for concurrent processing.    jobC := make(chan string)            // job queue    delayC := make(chan int)             // channel to receive delay    resultC := make(chan *http.Response) // channel for results    var wg *sync.WaitGroup    // Set up rate limiter.    limiter := rate.NewLimiter(5, 1)    for i := 0; i < workers; i++ {        wg.Add(1)        go func() {            defer wg.Done()            for s := range jobC {                limiter.Wait(ctx)                res, err := doSomeRequest(s)                if err != nil {                    // Handle error.                    log.Println(err)                }我注意到的是,當4/5請求達到速率限制時,將成功休眠和延遲(所有所有速率限制請求的總時間和時間,其中它只需要是最新的,因為它將具有新的總持續時間等待),但是當所有5個請求都達到速率限制時, 工人被卡住了,沒有從頻道讀取。backOffProcessbackOffProcess實現這一目標的更好方法是什么?
查看完整描述

1 回答

?
拉莫斯之舞

TA貢獻1820條經驗 獲得超10個贊

我真的不明白為什么你的被處在一個單獨的goroutine中。我認為每個工作進程都應該在執行任務之前退縮(如果需要的話)。我看到它是這樣的:backOffProcess


 backOffUntil := time.Now()

 backOffMutex := sync.Mutex{}

 go func() {

            defer wg.Done()


            for s := range jobC {

                <-time.After(time.Until(backOffUntil))

                limiter.Wait(ctx)

                res, err := doSomeRequest(s)

                if err != nil {

                    // Handle error.

                    log.Println(err)

                }


                // Handle rate limit.

                if res.StatusCode == 429 {

                    delay, _ := strconv.Atoi(res.Header.Get("Retry-After"))

                    log.Println("rate limit hit, backing off")


                    // Back off.

                    newbackOffUntil := time.Now().Add(time.Second * delay)

                    backOffMutex.Lock()

                    if newbackOffUntil.Unix() > backOffUntil.Unix() {

                        backOffUntil = newbackOffUntil

                    }

                    backOffMutex.Unlock()


                    // Put serial back into job queue.

                    jobC <- s

                }


                resultC <- res

            }

        }()


查看完整回答
反對 回復 2022-08-01
  • 1 回答
  • 0 關注
  • 125 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號