2 回答

TA貢獻1864條經驗 獲得超2個贊
為什么不將消息發送給一群工作人員呢?
像這樣的東西:
...
const workerPoolSize = 10 // the number of workers you want to have
wg := &sync.WaitGroup{}
wCh := make(chan string)
wg.Add(workerPoolSize) // you want to wait for 10 workers to finish the job
// run workers in goroutines
for i := 0; i < workerPoolSize; i++ {
go func(wch <-chan string) {
// get the data from the channel
for text := range wch {
c.Publish("logs", 0, false, text)
token.Wait()
}
wg.Done() // worker says that he finishes the job
}(wCh)
}
for i := 0; i < 5; i++ {
// put the data to the channel
wCh <- fmt.Sprintf("this is msg #%d!", i)
}
close(wCh)
wg.Wait() // wait for all workers to finish
...

TA貢獻1813條經驗 獲得超2個贊
當您說“在執行高延遲任務時大量傳遞消息”時,我假設您的意思是您想要異步發送消息(因此消息由與主代碼運行不同的 go 例程處理)。
如果是這種情況,那么對您的初始示例進行非常簡單的更改將為您提供:
for i := 0; i < 5; i++ {
? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)
? ? ? ? token := c.Publish("logs", 0, false, text)
? ? ? ? // comment out... token.Wait()
? ? }
注意:您的示例代碼可能會在消息實際發送之前退出;添加 time.Sleep(10 * time.Second) 會給它時間讓它們熄滅;請參閱下面的代碼了解處理此問題的另一種方法
您的初始代碼在消息發送之前停止的唯一原因是您調用了 token.Wait()。如果您不關心錯誤(并且您不檢查錯誤,所以我假設您不關心),那么調用 token.Wait() 就沒有什么意義(它只是等待消息發送;消息將消失無論你是否調用 token.Wait() )。
如果您想記錄任何錯誤,您可以使用類似以下內容:
for i := 0; i < 5; i++ {
? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)
? ? ? ? token := c.Publish("logs", 0, false, text)
? ? ? ? go func(){
? ? ? ? ? ? token.Wait()
? ? ? ? ? ? err := token.Error()
? ? ? ? ? ? if err != nil {
? ? ? ? ? ? ? ? fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error
? ? ? ? ? ? }
? ? ? ? }()
? ? }
請注意,如果消息傳遞至關重要(但由于您沒有檢查錯誤,我假設它不是),您還需要做一些其他事情。
就您找到的代碼而言;我懷疑這會增加您不需要的復雜性(并且需要更多信息才能解決此問題;例如,MqttProtocol 結構未在您粘貼的位中定義)。
額外的一點......在您的評論中您提到“發布的消息必須排序”。如果這是必要的(因此您想等到每條消息都已送達后再發送另一條消息),那么您需要類似以下內容:
msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
var wg sync.WaitGroup
wg.Add(1)
go func(){ // go routine to send messages from channel
? ? for msg := range msgChan {
? ? ? ? token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
? ? ? ? token.Wait()
? ? ? ? // should check for errors here
? ? }
? ? wg.Done()
}()
for i := 0; i < 5; i++ {
? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)
? ? ? ? msgChan <- text
? ? }
close(msgChan) // this will stop the goroutine (when all messages processed)
wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)
注意:這與 Ilya Kaznacheev 的解決方案類似(如果將workerPoolSize設置為1并使通道緩沖)
正如您的評論表明等待組使這一點難以理解,這里是另一種可能更清晰的等待方式(等待組通常在您等待多件事情完成時使用;在這個例子中,我們只等待一件事情,所以可以使用更簡單的方法)
msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
done := make(chan struct{}) // channel used to indicate when go routine has finnished
go func(){ // go routine to send messages from channel
? ? for msg := range msgChan {
? ? ? ? token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
? ? ? ? token.Wait()
? ? ? ? // should check for errors here
? ? }
? ? close(done) // let main routine know we have finnished
}()
for i := 0; i < 5; i++ {
? ? ? ? text := fmt.Sprintf("this is msg #%d!", i)
? ? ? ? msgChan <- text
? ? }
close(msgChan) // this will stop the goroutine (when all messages processed)
<-done // wait for publish go routine to complete
- 2 回答
- 0 關注
- 178 瀏覽
添加回答
舉報