亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

讓 go 例程等待 rabbitMQ 發送結果

讓 go 例程等待 rabbitMQ 發送結果

Go
嚕嚕噠 2023-05-08 15:33:15
我是 Go 的新手,我想制作一個管道來翻譯我收到的每個請求,方法是將它發送到第一個隊列 (TEST),然后從最后一個隊列 (RESULT) 中獲取最終結果并將其作為響應發回。我面臨的問題是,響應永遠不會等到所有結果從隊列中返回。這是代碼:func main() {    requests := []int{3, 4, 5, 6, 7}    var wg sync.WaitGroup    wg.Add(1)    resArr := []string{}    go func() {        for _, r := range requests {            rabbitSend("TEST", r)            resArr = append(resArr, <-rabbitReceive("RESULT"))        }        defer wg.Done()    }()    wg.Wait()    log.Println("Result", resArr)}兔子發送方法:func rabbitSend(queueName string, msg int) {    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")    failOnError(err, "Failed to connect to RabbitMQ")    defer conn.Close()    ch, err := conn.Channel()    failOnError(err, "Failed to open a channel")    defer ch.Close()    q, err := ch.QueueDeclare(        queueName, // name        true,      // durable        false,     // delete when unused        false,     // exclusive        false,     // no-wait        nil,       // arguments    )    failOnError(err, "Failed to declare a queue")    body, _ := json.Marshal(msg)    err = ch.Publish(        "",     // exchange        q.Name, // routing key        false,  // mandatory        false,  // immediate        amqp.Publishing{            ContentType: "application/json",            Body:        []byte(body),        })    log.Printf("[x] Sent %s to %s", body, q.Name)    failOnError(err, "Failed to publish a message")}兔子接收方法:func rabbitReceive(queueName string) <-chan string {    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")    failOnError(err, "Failed to connect to RabbitMQ")    defer conn.Close()    ch, err := conn.Channel()    failOnError(err, "Failed to open a channel")    defer ch.Close()    q, err := ch.QueueDeclare(        queueName, // name        true,      // durable        false,     // delete when usused        false,     // exclusive        false,     // no-waits        nil,       // arguments    )
查看完整描述

2 回答

?
侃侃爾雅

TA貢獻1801條經驗 獲得超16個贊

修改你的func rabbitReceive(queueName string) <-chan string如下:


 func rabbitReceive(queueName string) <-chan string {

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

    failOnError(err, "Failed to connect to RabbitMQ")


    ch, err := conn.Channel()

    failOnError(err, "Failed to open a channel")


    q, err := ch.QueueDeclare(

        queueName, // name

        true,      // durable

        false,     // delete when usused

        false,     // exclusive

        false,     // no-waits

        nil,       // arguments

    )

    failOnError(err, "Failed to declare a queue")


    msgs, err := ch.Consume(

        q.Name, // queue

        "",     // consumer

        true,   // auto-ack

        false,  // exclusive

        false,  // no-local

        false,  // no-wait

        nil,    // args

    )

    failOnError(err, "Failed to register a consumer")


    resCh := make(chan string)

    go func() {

        d := <-msgs

        log.Printf("Received a message: %v from %v", string(d.Body), q.Name)

        resCh <- string(d.Body)

        conn.Close()

        ch.Close()

        close(resCh)

    }()

    return resCh

}

以前的代碼導致您出現問題的原因是defer ch.Close(). ch在將響應寫入之前關閉resCh。


查看完整回答
反對 回復 2023-05-08
?
慕哥6287543

TA貢獻1831條經驗 獲得超10個贊

跟進 @nightfury1204 很好的答案,你確實ch在寫信給resCh. 只有一件事,在 go 例程中你想遍歷所有消息,所以更好的方法是:


go func() {

        for d := range msgs {

          log.Printf("Received a message: %v from %v", string(d.Body), q.Name)

          resCh <- string(d.Body)  

        }

        conn.Close()

        ch.Close()

        close(resCh)

    }()


查看完整回答
反對 回復 2023-05-08
  • 2 回答
  • 0 關注
  • 157 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號