2 回答

TA貢獻1873條經驗 獲得超9個贊
修改中斷處理以在中斷時關閉通道。這允許多個 goroutines 通過等待通道關閉來等待事件。
shutdown := make(chan struct{})
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
go func() {
? ? <-interrupt
? ? log.Println("interrupt")
? ? close(shutdown)
}()
將每個連接代碼移動到一個函數中。這段代碼是從問題中復制粘貼的,有兩個變化:中斷通道被關閉通道替換;該函數在函數完成時通知 sync.WaitGroup。
func connect(u string, shutdown chan struct{}, wg *sync.WaitGroup) {
? ? defer wg.Done()
? ? log.Printf("connecting to %s", u)
? ? c, _, err := websocket.DefaultDialer.Dial(u, nil)
? ? if err != nil {
? ? ? ? log.Fatal("dial:", err)
? ? }
? ? defer c.Close()
? ? done := make(chan struct{})
? ? go func() {
? ? ? ? defer close(done)
? ? ? ? for {
? ? ? ? ? ? _, message, err := c.ReadMessage()
? ? ? ? ? ? if err != nil {
? ? ? ? ? ? ? ? log.Println("read:", err)
? ? ? ? ? ? ? ? return
? ? ? ? ? ? }
? ? ? ? ? ? log.Printf("recv: %s", message)
? ? ? ? }
? ? }()
? ? ticker := time.NewTicker(time.Second)
? ? defer ticker.Stop()
? ? for {
? ? ? ? select {
? ? ? ? case <-done:
? ? ? ? ? ? return
? ? ? ? case t := <-ticker.C:
? ? ? ? ? ? err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
? ? ? ? ? ? if err != nil {
? ? ? ? ? ? ? ? log.Println("write:", err)
? ? ? ? ? ? ? ? return
? ? ? ? ? ? }
? ? ? ? case <-shutdown:
? ? ? ? ? ? // Cleanly close the connection by sending a close message and then
? ? ? ? ? ? // waiting (with timeout) for the server to close the connection.
? ? ? ? ? ? err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
? ? ? ? ? ? if err != nil {
? ? ? ? ? ? ? ? log.Println("write close:", err)
? ? ? ? ? ? ? ? return
? ? ? ? ? ? }
? ? ? ? ? ? select {
? ? ? ? ? ? case <-done:
? ? ? ? ? ? case <-time.After(time.Second):
? ? ? ? ? ? }
? ? ? ? ? ? return
? ? ? ? }
? ? }
}
在中聲明一個sync.WaitGroupmain()
。對于要連接到的每個 websocket 端點,遞增 WaitGroup 并啟動 goroutine 以連接該端點。啟動 goroutine 后,在 WaitGroup 上等待 goroutine 完成。
var wg sync.WaitGroup
for _, u := range endpoints { // endpoints is []string?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // where elements are URLs?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // of endpoints to connect to.
? ? wg.Add(1)
? ? go connect(u, shutdown, &wg)
}
wg.Wait()

TA貢獻1786條經驗 獲得超13個贊
與每個不同服務器的通信是否完全獨立于其他服務器?如果是的話,我會以這樣的方式四處走動:
在main中創建一個帶有取消函數的上下文
在 main 中創建一個等待組來跟蹤啟動的 goroutines
對于每個服務器,添加到等待組,從傳遞上下文和等待組引用的主函數啟動一個新的 goroutine
main進入一個 for/select 循環,監聽信號,如果信號到達,調用 cancelfunc 并等待等待組。
main還可以監聽來自 goroutines 的結果 chan,并可能自己打印結果,因為 goroutines 不應該直接這樣做。
正如我們所說,每個goroutine都有 wg 的引用、上下文和可能的 chan 以返回結果?,F在,如果 goroutine 必須只做一件事,或者它是否需要做一系列事情,這個方法就會分裂。對于第一種方法
如果只有一件事要做,我們會遵循此處描述的方法(觀察到異步他會依次啟動一個新的 goroutine 來執行 DoSomething() 步驟,該步驟將在通道上返回結果)這允許它能夠隨時接受取消信號。由您決定您想要的非阻塞程度以及您想要響應取消信號的速度。此外,將關聯的上下文傳遞給 goroutines 的好處是您可以調用 Context enabled大多數庫函數的版本。例如,如果您希望您的撥號超時為 1 分鐘,您可以創建一個新的上下文,該上下文從傳遞的超時開始,然后使用該超時創建 DialContext。這允許撥號從超時或父(您在 main 中創建的)上下文的 cancelfunc 被調用時停止。
如果需要做更多的事情,我通常更喜歡用 goroutine 做一件事,讓它調用一個新的 goroutine 來執行下一步(將所有引用傳遞到管道中)然后退出。
這種方法可以很好地擴展取消,并能夠在任何步驟停止流水線,并且可以輕松地為可能需要太長時間的步驟支持帶有 dealines 的上下文。
- 2 回答
- 0 關注
- 286 瀏覽
添加回答
舉報