亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用此代碼(Paho MQTT)作為 GoRoutine 并通過通道傳遞消息以通過

使用此代碼(Paho MQTT)作為 GoRoutine 并通過通道傳遞消息以通過

Go
侃侃無極 2023-07-26 17:20:34
作為標準代碼,我用來發布消息以進行測試:func main() {    opts := MQTT.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")    opts.SetClientID("myclientid_")    opts.SetDefaultPublishHandler(f)    opts.SetConnectionLostHandler(connLostHandler)    opts.OnConnect = func(c MQTT.Client) {        fmt.Printf("Client connected, subscribing to: test/topic\n")        if token := c.Subscribe("logs", 0, nil); token.Wait() && token.Error() != nil {            fmt.Println(token.Error())            os.Exit(1)        }    }    c := MQTT.NewClient(opts)    if token := c.Connect(); token.Wait() && token.Error() != nil {        panic(token.Error())    }    for i := 0; i < 5; i++ {        text := fmt.Sprintf("this is msg #%d!", i)        token := c.Publish("logs", 0, false, text)        token.Wait()    }    time.Sleep(3 * time.Second)    if token := c.Unsubscribe("logs"); token.Wait() && token.Error() != nil {        fmt.Println(token.Error())        os.Exit(1)    }    c.Disconnect(250)}這個效果很好!但是在執行高延遲任務時大量傳遞消息,我的程序性能會很低,所以我必須使用 goroutine 和 Channel。這段代碼正是我想要的!但作為 Golang 中的菜鳥,我不知道如何START()在主函數中運行函數以及要傳遞什么參數!特別是,我將如何使用通道將消息傳遞給工作人員(發布者)?!我們將不勝感激您的幫助!
查看完整描述

2 回答

?
慕斯王

TA貢獻1864條經驗 獲得超2個贊

為什么不將消息發送給一群工作人員呢?


像這樣的東西:


...

    const workerPoolSize = 10 // the number of workers you want to have

    wg := &sync.WaitGroup{}

    wCh := make(chan string)

    wg.Add(workerPoolSize) // you want to wait for 10 workers to finish the job


    // run workers in goroutines

    for i := 0; i < workerPoolSize; i++ {

        go func(wch <-chan string) {

            // get the data from the channel

            for text := range wch {

                c.Publish("logs", 0, false, text)

                token.Wait()

            }

            wg.Done() // worker says that he finishes the job

        }(wCh)

    }


    for i := 0; i < 5; i++ {

        // put the data to the channel

        wCh <- fmt.Sprintf("this is msg #%d!", i)

    }


        close(wCh)

    wg.Wait() // wait for all workers to finish

...


查看完整回答
反對 回復 2023-07-26
?
慕姐8265434

TA貢獻1813條經驗 獲得超2個贊

當您說“在執行高延遲任務時大量傳遞消息”時,我假設您的意思是您想要異步發送消息(因此消息由與主代碼運行不同的 go 例程處理)。

如果是這種情況,那么對您的初始示例進行非常簡單的更改將為您提供:

for i := 0; i < 5; i++ {

? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)

? ? ? ? token := c.Publish("logs", 0, false, text)

? ? ? ? // comment out... token.Wait()

? ? }

注意:您的示例代碼可能會在消息實際發送之前退出;添加 time.Sleep(10 * time.Second) 會給它時間讓它們熄滅;請參閱下面的代碼了解處理此問題的另一種方法


您的初始代碼在消息發送之前停止的唯一原因是您調用了 token.Wait()。如果您不關心錯誤(并且您不檢查錯誤,所以我假設您不關心),那么調用 token.Wait() 就沒有什么意義(它只是等待消息發送;消息將消失無論你是否調用 token.Wait() )。


如果您想記錄任何錯誤,您可以使用類似以下內容:


for i := 0; i < 5; i++ {

? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)

? ? ? ? token := c.Publish("logs", 0, false, text)

? ? ? ? go func(){

? ? ? ? ? ? token.Wait()

? ? ? ? ? ? err := token.Error()

? ? ? ? ? ? if err != nil {

? ? ? ? ? ? ? ? fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error

? ? ? ? ? ? }

? ? ? ? }()

? ? }

請注意,如果消息傳遞至關重要(但由于您沒有檢查錯誤,我假設它不是),您還需要做一些其他事情。


就您找到的代碼而言;我懷疑這會增加您不需要的復雜性(并且需要更多信息才能解決此問題;例如,MqttProtocol 結構未在您粘貼的位中定義)。


額外的一點......在您的評論中您提到“發布的消息必須排序”。如果這是必要的(因此您想等到每條消息都已送達后再發送另一條消息),那么您需要類似以下內容:


msgChan := make(chan string, 200) // Allow a queue of up to 200 messages

var wg sync.WaitGroup

wg.Add(1)

go func(){ // go routine to send messages from channel

? ? for msg := range msgChan {

? ? ? ? token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital

? ? ? ? token.Wait()

? ? ? ? // should check for errors here

? ? }

? ? wg.Done()

}()


for i := 0; i < 5; i++ {

? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)

? ? ? ? msgChan <- text

? ? }

close(msgChan) // this will stop the goroutine (when all messages processed)

wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)

注意:這與 Ilya Kaznacheev 的解決方案類似(如果將workerPoolSize設置為1并使通道緩沖)


正如您的評論表明等待組使這一點難以理解,這里是另一種可能更清晰的等待方式(等待組通常在您等待多件事情完成時使用;在這個例子中,我們只等待一件事情,所以可以使用更簡單的方法)


msgChan := make(chan string, 200) // Allow a queue of up to 200 messages

done := make(chan struct{}) // channel used to indicate when go routine has finnished


go func(){ // go routine to send messages from channel

? ? for msg := range msgChan {

? ? ? ? token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital

? ? ? ? token.Wait()

? ? ? ? // should check for errors here

? ? }

? ? close(done) // let main routine know we have finnished

}()


for i := 0; i < 5; i++ {

? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)

? ? ? ? msgChan <- text

? ? }

close(msgChan) // this will stop the goroutine (when all messages processed)

<-done // wait for publish go routine to complete


查看完整回答
反對 回復 2023-07-26
  • 2 回答
  • 0 關注
  • 178 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號