亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在 Go 中發送 ack 和 term 后消息仍在 nats 限制隊列中

在 Go 中發送 ack 和 term 后消息仍在 nats 限制隊列中

Go
偶然的你 2023-02-14 15:26:24
我嘗試為 NATS 限制隊列編寫訂閱者:sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))if err != nil {    return err}msg, err := sub.NextMsgWithContext(ctx)if err != nil {    if errors.Is(err, nats.ErrSlowConsumer) {        log.Printf("Slow consumer error returned. Waiting for reset...")        time.Sleep(50 * time.Millisecond)        continue    } else {        return err    }}msg.InProgress()var message pnats.NatsMessageif err := conn.unmarshaller(msg.Data, &message); err != nil {    msg.Term()    return err}actualSubject := message.Context.FullSubject()handler, ok := callbacks[message.Context.Category]if !ok {    msg.Nak()    continue}callback, err := handler(&message)if err == nil {    msg.Ack()    msg.Term()} else {    msg.Nak()    return err}callback(ctx)此代碼的目標是使用有關多個主題的任何消息并調用與該主題關聯的回調函數。handler這段代碼有效,但我遇到的問題是,如果該函數未返回錯誤,我希望在調用后刪除該消息。我以為那是msg.Term在做什么,但我仍然看到隊列中的所有消息。我最初是圍繞一個工作隊列設計的,但我希望它能與多個訂閱者一起工作,所以我不得不重新設計它。有什么辦法可以使這項工作?
查看完整描述

1 回答

?
梵蒂岡之花

TA貢獻1900條經驗 獲得超5個贊

根據提供的代碼,我假設您在使用 JetStream 庫創建訂閱時沒有提供流和消費者信息。

在該方法的文檔SubscribeSync中,它說當未提供流和消費者信息時,庫將創建一個臨時消費者,消費者的名稱由服務器選擇。它還試圖弄清楚訂閱是針對哪個流。

這是我相信在您的代碼中發生的事情:

  • 當您調用該SubscribeSync方法時,將使用您提供的主題創建一個臨時消費者。

  • msg.Ackmsg.Term被調用時,您確實確認了該消息,但僅限于當前消費者。

  • 下次調用該SubscribeSync方法時,將創建一個新的臨時消費者,其中包含您已在另一個消費者上刪除的消息。Jetstream 的流、消費者和訂閱概念就是這樣設計的。

根據您想要完成的任務,這里有一些建議:

  • 使用普通的 NATS 核心庫來處理發布/訂閱或隊列。不要使用 JetStream。NATS Core 庫直接處理主題,而 Jetstream 庫在未提供信息的情況下創建額外的東西(流和消費者)。

  • 使用 JetStream,但通過代碼或直接在 NATS 服務器上自己創建流和持久消費者。這樣,在定義了流和消費者的情況下,您應該能夠使其按預期工作。


查看完整回答
反對 回復 2023-02-14
  • 1 回答
  • 0 關注
  • 176 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號