我有一個 Go 應用程序處理來自單個 RabbitMQ 隊列的事件。我使用github.com/streadway/amqpRabbitMQ 客戶端庫。Go 應用程序在約 2-3 秒內處理每條消息。如果我從內存中輸入消息,則可以并行處理大約 1000 條甚至更多消息。但不幸的是,RabbitMQ 的性能更差。所以,我想更快地消耗隊列中的消息。所以,問題是:如何使用最有效的方式消費消息github.com/streadway/amqp?據我了解,有兩種方法:設置高預取https://godoc.org/github.com/streadway/amqp#Channel.Qos.使用單一消費者 goroutine示例代碼:conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()ch.Qos( 10000, // prefetch count 0, // prefetch size false, // global )msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // NO auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args)for d := range msgs { log.Printf("Received a message: %s", d.Body) err:= processMessage(d) if err != nil { log.Printf("%s : while consuming task", err) d.Nack(false, true) } else { d.Ack(false) } continue // consume other messages}但是processMessage這里會并行調用嗎?但這是 RAM 友好的方法嗎?對于 RabbitMQ 來說,為每個工作進程生成一個新通道不是很戲劇性嗎?那么,問題是,哪種變體更好?更好的性能、更好的內存使用等。那么,這里 RabbitMQ 的最佳用法是什么?更新:目前,我遇到了一個情況,我的工作人員消耗了 VPS 上的所有 RAM,并且被 OOM 殺死。我使用了第二種方法。因此,就我而言,更好的是能夠讓我的工作人員在工作幾分鐘后不會被 OOM 殺死。更新2:nack當worker無法處理消息時,以及ack當worker處理消息時非常重要。所有消息都必須被處理(其客戶分析),但有時工作人員無法處理它,因此它必須通過nack消息將其傳遞給其他工作人員(目前,一些用于處理消息的第 3 方 api 有時只是返回 503 狀態代碼,在此案例消息應傳遞給其他工作人員或重試)。所以,auto-ack不幸的是,使用不是一個選擇。
1 回答

縹緲止盈
TA貢獻2041條經驗 獲得超4個贊
我想每次都processMessage()
在一個新的 goroutine 中運行。
哪個變體更好?
我更喜歡第一個,因為打開/關閉通道有點昂貴(2 + 2 TCP 數據包)。我認為你的 OOM 問題與太多 gorutine 無關,gorutine 很輕,只需要 5KB 左右。所以問題很可能是由你的processMessage()
.
我認為github.com/streadway/amqp
通道消費操作是線程/goroutine安全的,因此如果你只做一些消費操作,那么在goruntine之間共享通道是安全的。
- 1 回答
- 0 關注
- 206 瀏覽
添加回答
舉報
0/150
提交
取消