亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

將數據從一個戈魯丁發送到多個其他戈魯丁

將數據從一個戈魯丁發送到多個其他戈魯丁

Go
小怪獸愛吃肉 2022-09-05 09:19:27
在項目中,程序通過websocket接收數據。此數據需要由 n 種算法處理。算法的數量可以動態變化。我的嘗試是創建一些發布/訂閱模式,可以在其中啟動和取消訂閱。事實證明,這比預期的更具挑戰性。以下是我想出的(基于 https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):package pubsubimport (    "context"    "sync"    "time")type Pubsub struct {    sync.RWMutex    subs   []*Subsciption    closed bool}func New() *Pubsub {    ps := &Pubsub{}    ps.subs = []*Subsciption{}    return ps}func (ps *Pubsub) Publish(msg interface{}) {    ps.RLock()    defer ps.RUnlock()    if ps.closed {        return    }    for _, sub := range ps.subs {        // ISSUE1: These goroutines apparently do not exit properly...         go func(ch chan interface{}) {            ch <- msg        }(sub.Data)    }}func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {    ps.Lock()    defer ps.Unlock()    // prep channel    ctx, cancel := context.WithCancel(context.Background())    sub := &Subsciption{        Data:   make(chan interface{}, 1),        cancel: cancel,        ps:     ps,    }    // prep subsciption    ps.subs = append(ps.subs, sub)    return ctx, sub, nil}正如評論中提到的,這(至少)有兩個問題:問題 1:在實現中運行了一段時間后,我得到了一些這樣的錯誤:goroutine 120624 [runnable]: bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)     /home/X/Projects/bm/internal/pubsub/pubsub.go:30created by bookmaker/internal/pubsub.(*Pubsub).Publish     /home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb在沒有真正理解這一點的情況下,在我看來,在中創建的goroutines確實會累積/泄漏。這是正確的嗎,我在這里做錯了什么?Publish()問題 2:當我通過它結束訂閱時,它試圖寫入關閉的通道并恐慌。為了緩解這種情況,我創建了一個 goroutine 來關閉延遲通道。這感覺真的偏離了最佳實踐,但我無法找到適當的解決方案。什么是確定性的方法?Unsubscribe()Publish()這里有一個小操場供您測試:https://play.golang.org/p/K-L8vLjt7_9
查看完整描述

1 回答

?
侃侃無極

TA貢獻2051條經驗 獲得超10個贊

在深入研究您的解決方案及其問題之前,讓我再次推薦此答案中介紹的另一種 Broker 方法:如何使用通道廣播消息


現在進入您的解決方案。


每當你啟動 goroutine 時,請始終考慮它將如何結束,并確保如果 goroutine 不應該在應用的生命周期內運行,請確保它確實如此。


// ISSUE1: These goroutines apparently do not exit properly... 

go func(ch chan interface{}) {

    ch <- msg

}(sub.Data)

此 goroutine 嘗試在 上發送值。這可能是一個阻塞操作:如果 的緩沖區已滿并且 上沒有現成的接收器,它將阻塞。這是脫離了發射的goroutine的控制,也脫離了對包裝的控制。在某些情況下,這可能很好,但這已經給軟件包的用戶帶來了負擔。盡量避免這些。嘗試創建易于使用且難以濫用的 API。chchchpubsub


此外,僅僅為了在頻道上發送價值而啟動 goroutine 是一種資源浪費(goroutine 既便宜又輕便,但你不應該盡可能地向它們發送垃圾郵件)。


你這樣做是因為你不想被阻止。為避免阻塞,您可以使用具有“合理”高緩沖器的緩沖通道。是的,這并不能解決阻塞問題,只能幫助“慢速”客戶端從通道接收。


要“真正”避免在不啟動 goroutine 的情況下阻塞,您可以使用非阻塞發送:


select {

case ch <- msg:

default:

    // ch's buffer is full, we cannot deliver now

}

如果發送可以繼續,它將發生。如果沒有,則立即選擇分支。你必須決定該怎么做?!皝G失”消息是否可以接受?等到“放棄”可以接受一段時間嗎?或者是否可以啟動一個goroutine來執行此操作(但隨后您將回到我們在這里嘗試解決的問題)?或者,在客戶端可以從通道接收之前,是否可以被阻止...chdefault


選擇合理的高緩沖區,如果遇到它仍然變滿的情況,在客戶端可以前進并從消息接收之前,阻止可能是可以接受的。如果不能,則整個應用可能處于不可接受的狀態,并且“掛起”或“崩潰”可能是可以接受的。


// ISSUE2: close the channel async with a delay to ensure

// nothing will be written to the channel anymore

// via a pending goroutine from Publish()

go func(ch chan interface{}) {

    time.Sleep(500 * time.Millisecond)

    close(ch)

}(s.Data)

關閉通道是向接收器發出的信號,表示通道上不會發送更多值。因此,關閉通道始終是發送者的工作(和責任)。啟動 goroutine 以關閉通道,您將該工作和責任“移交給”另一個不會與發送方同步的“實體”(goroutine)。這可能很容易導致死機(在閉合通道上發送是運行時死機,有關其他公理,請參閱未初始化的通道如何工作?)。別這樣。


是的,這是必要的,因為您啟動了goroutines來發送。如果你不這樣做,那么你可以“就地”關閉,而不啟動goroutine,因為這樣發送者和關閉者將是同一個實體:它本身,其發送和關閉操作受互斥鎖保護。因此,解決第一個問題自然而然地解決了第二個問題。Pubsub


通常,如果一個通道有多個發送方,則必須協調關閉通道。必須有一個實體(通常不是任何發送方)等待所有發送方完成,實際上使用 一個 ,然后該單個實體可以安全地關閉通道。請參閱關閉長度未知的通道。sync.WaitGroup


查看完整回答
反對 回復 2022-09-05
  • 1 回答
  • 0 關注
  • 78 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號