亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Golang - 為到不同服務器的多個連接擴展 websocket 客戶端

Golang - 為到不同服務器的多個連接擴展 websocket 客戶端

Go
胡子哥哥 2023-06-05 16:54:42
我有一個 websocket 客戶端。實際上,它比下面顯示的基本代碼復雜得多。我現在需要擴展此客戶端代碼以打開到多個服務器的連接。最終,從服務器接收到消息時需要執行的任務是相同的。處理這個問題的最佳方法是什么?正如我上面所說,接收消息時執行的實際代碼比示例中顯示的要復雜得多。package mainimport (        "flag"        "log"        "net/url"        "os"        "os/signal"        "time"        "github.com/gorilla/websocket")var addr = flag.String("addr", "localhost:1234", "http service address")func main() {        flag.Parse()        log.SetFlags(0)        interrupt := make(chan os.Signal, 1)        signal.Notify(interrupt, os.Interrupt)        // u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}        u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}        log.Printf("connecting to %s", u.String())        c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)        if err != nil {                log.Fatal("dial:", err)        }        defer c.Close()        done := make(chan struct{})        go func() {                defer close(done)                for {                        _, message, err := c.ReadMessage()                        if err != nil {                                log.Println("read:", err)                                return                        }                        log.Printf("recv: %s", message)                }        }()        ticker := time.NewTicker(time.Second)        defer ticker.Stop()        for {                select {                case <-done:                        return                case t := <-ticker.C:                        err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))                        if err != nil {                                log.Println("write:", err)                                return                        }
查看完整描述

2 回答

?
眼眸繁星

TA貢獻1873條經驗 獲得超9個贊

修改中斷處理以在中斷時關閉通道。這允許多個 goroutines 通過等待通道關閉來等待事件。


shutdown := make(chan struct{})

interrupt := make(chan os.Signal, 1)

signal.Notify(interrupt, os.Interrupt)

go func() {

? ? <-interrupt

? ? log.Println("interrupt")

? ? close(shutdown)

}()

將每個連接代碼移動到一個函數中。這段代碼是從問題中復制粘貼的,有兩個變化:中斷通道被關閉通道替換;該函數在函數完成時通知 sync.WaitGroup。


func connect(u string, shutdown chan struct{}, wg *sync.WaitGroup) {

? ? defer wg.Done()


? ? log.Printf("connecting to %s", u)

? ? c, _, err := websocket.DefaultDialer.Dial(u, nil)

? ? if err != nil {

? ? ? ? log.Fatal("dial:", err)

? ? }

? ? defer c.Close()


? ? done := make(chan struct{})


? ? go func() {

? ? ? ? defer close(done)

? ? ? ? for {

? ? ? ? ? ? _, message, err := c.ReadMessage()

? ? ? ? ? ? if err != nil {

? ? ? ? ? ? ? ? log.Println("read:", err)

? ? ? ? ? ? ? ? return

? ? ? ? ? ? }

? ? ? ? ? ? log.Printf("recv: %s", message)

? ? ? ? }

? ? }()


? ? ticker := time.NewTicker(time.Second)

? ? defer ticker.Stop()


? ? for {

? ? ? ? select {

? ? ? ? case <-done:

? ? ? ? ? ? return

? ? ? ? case t := <-ticker.C:

? ? ? ? ? ? err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))

? ? ? ? ? ? if err != nil {

? ? ? ? ? ? ? ? log.Println("write:", err)

? ? ? ? ? ? ? ? return

? ? ? ? ? ? }

? ? ? ? case <-shutdown:

? ? ? ? ? ? // Cleanly close the connection by sending a close message and then

? ? ? ? ? ? // waiting (with timeout) for the server to close the connection.

? ? ? ? ? ? err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))

? ? ? ? ? ? if err != nil {

? ? ? ? ? ? ? ? log.Println("write close:", err)

? ? ? ? ? ? ? ? return

? ? ? ? ? ? }

? ? ? ? ? ? select {

? ? ? ? ? ? case <-done:

? ? ? ? ? ? case <-time.After(time.Second):

? ? ? ? ? ? }

? ? ? ? ? ? return

? ? ? ? }

? ? }

}

在中聲明一個sync.WaitGroupmain()。對于要連接到的每個 websocket 端點,遞增 WaitGroup 并啟動 goroutine 以連接該端點。啟動 goroutine 后,在 WaitGroup 上等待 goroutine 完成。


var wg sync.WaitGroup

for _, u := range endpoints { // endpoints is []string?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // where elements are URLs?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // of endpoints to connect to.

? ? wg.Add(1)

? ? go connect(u, shutdown, &wg)

}

wg.Wait()


查看完整回答
反對 回復 2023-06-05
?
開滿天機

TA貢獻1786條經驗 獲得超13個贊

與每個不同服務器的通信是否完全獨立于其他服務器?如果是的話,我會以這樣的方式四處走動:

  • main中創建一個帶有取消函數的上下文

  • 在 main 中創建一個等待組來跟蹤啟動的 goroutines

  • 對于每個服務器,添加到等待組,從傳遞上下文和等待組引用的主函數啟動一個新的 goroutine

  • main進入一個 for/select 循環,監聽信號,如果信號到達,調用 cancelfunc 并等待等待組。

  • main還可以監聽來自 goroutines 的結果 chan,并可能自己打印結果,因為 goroutines 不應該直接這樣做。

  • 正如我們所說,每個goroutine都有 wg 的引用、上下文和可能的 chan 以返回結果?,F在,如果 goroutine 必須只做一件事,或者它是否需要做一系列事情,這個方法就會分裂。對于第一種方法

  • 如果只有一件事要做,我們會遵循此處描述的方法(觀察到異步他會依次啟動一個新的 goroutine 來執行 DoSomething() 步驟,該步驟將在通道上返回結果)這允許它能夠隨時接受取消信號。由您決定您想要的非阻塞程度以及您想要響應取消信號的速度。此外,將關聯的上下文傳遞給 goroutines 的好處是您可以調用 Context enabled大多數庫函數的版本。例如,如果您希望您的撥號超時為 1 分鐘,您可以創建一個新的上下文,該上下文從傳遞的超時開始,然后使用該超時創建 DialContext。這允許撥號從超時或父(您在 main 中創建的)上下文的 cancelfunc 被調用時停止。

  • 如果需要做更多的事情,我通常更喜歡用 goroutine 做一件事,讓它調用一個新的 goroutine 來執行下一步(將所有引用傳遞到管道中)然后退出。

這種方法可以很好地擴展取消,并能夠在任何步驟停止流水線,并且可以輕松地為可能需要太長時間的步驟支持帶有 dealines 的上下文。


查看完整回答
反對 回復 2023-06-05
  • 2 回答
  • 0 關注
  • 286 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號