我嘗試為 NATS 限制隊列編寫訂閱者:sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))if err != nil { return err}msg, err := sub.NextMsgWithContext(ctx)if err != nil { if errors.Is(err, nats.ErrSlowConsumer) { log.Printf("Slow consumer error returned. Waiting for reset...") time.Sleep(50 * time.Millisecond) continue } else { return err }}msg.InProgress()var message pnats.NatsMessageif err := conn.unmarshaller(msg.Data, &message); err != nil { msg.Term() return err}actualSubject := message.Context.FullSubject()handler, ok := callbacks[message.Context.Category]if !ok { msg.Nak() continue}callback, err := handler(&message)if err == nil { msg.Ack() msg.Term()} else { msg.Nak() return err}callback(ctx)此代碼的目標是使用有關多個主題的任何消息并調用與該主題關聯的回調函數。handler這段代碼有效,但我遇到的問題是,如果該函數未返回錯誤,我希望在調用后刪除該消息。我以為那是msg.Term在做什么,但我仍然看到隊列中的所有消息。我最初是圍繞一個工作隊列設計的,但我希望它能與多個訂閱者一起工作,所以我不得不重新設計它。有什么辦法可以使這項工作?
1 回答

梵蒂岡之花
TA貢獻1900條經驗 獲得超5個贊
根據提供的代碼,我假設您在使用 JetStream 庫創建訂閱時沒有提供流和消費者信息。
在該方法的文檔SubscribeSync
中,它說當未提供流和消費者信息時,庫將創建一個臨時消費者,消費者的名稱由服務器選擇。它還試圖弄清楚訂閱是針對哪個流。
這是我相信在您的代碼中發生的事情:
當您調用該
SubscribeSync
方法時,將使用您提供的主題創建一個臨時消費者。當
msg.Ack
和msg.Term
被調用時,您確實確認了該消息,但僅限于當前消費者。下次調用該
SubscribeSync
方法時,將創建一個新的臨時消費者,其中包含您已在另一個消費者上刪除的消息。Jetstream 的流、消費者和訂閱概念就是這樣設計的。
根據您想要完成的任務,這里有一些建議:
使用普通的 NATS 核心庫來處理發布/訂閱或隊列。不要使用 JetStream。NATS Core 庫直接處理主題,而 Jetstream 庫在未提供信息的情況下創建額外的東西(流和消費者)。
使用 JetStream,但通過代碼或直接在 NATS 服務器上自己創建流和持久消費者。這樣,在定義了流和消費者的情況下,您應該能夠使其按預期工作。
- 1 回答
- 0 關注
- 176 瀏覽
添加回答
舉報
0/150
提交
取消