1 回答

TA貢獻2051條經驗 獲得超10個贊
在深入研究您的解決方案及其問題之前,讓我再次推薦此答案中介紹的另一種 Broker 方法:如何使用通道廣播消息
現在進入您的解決方案。
每當你啟動 goroutine 時,請始終考慮它將如何結束,并確保如果 goroutine 不應該在應用的生命周期內運行,請確保它確實如此。
// ISSUE1: These goroutines apparently do not exit properly...
go func(ch chan interface{}) {
ch <- msg
}(sub.Data)
此 goroutine 嘗試在 上發送值。這可能是一個阻塞操作:如果 的緩沖區已滿并且 上沒有現成的接收器,它將阻塞。這是脫離了發射的goroutine的控制,也脫離了對包裝的控制。在某些情況下,這可能很好,但這已經給軟件包的用戶帶來了負擔。盡量避免這些。嘗試創建易于使用且難以濫用的 API。chchchpubsub
此外,僅僅為了在頻道上發送價值而啟動 goroutine 是一種資源浪費(goroutine 既便宜又輕便,但你不應該盡可能地向它們發送垃圾郵件)。
你這樣做是因為你不想被阻止。為避免阻塞,您可以使用具有“合理”高緩沖器的緩沖通道。是的,這并不能解決阻塞問題,只能幫助“慢速”客戶端從通道接收。
要“真正”避免在不啟動 goroutine 的情況下阻塞,您可以使用非阻塞發送:
select {
case ch <- msg:
default:
// ch's buffer is full, we cannot deliver now
}
如果發送可以繼續,它將發生。如果沒有,則立即選擇分支。你必須決定該怎么做?!皝G失”消息是否可以接受?等到“放棄”可以接受一段時間嗎?或者是否可以啟動一個goroutine來執行此操作(但隨后您將回到我們在這里嘗試解決的問題)?或者,在客戶端可以從通道接收之前,是否可以被阻止...chdefault
選擇合理的高緩沖區,如果遇到它仍然變滿的情況,在客戶端可以前進并從消息接收之前,阻止可能是可以接受的。如果不能,則整個應用可能處于不可接受的狀態,并且“掛起”或“崩潰”可能是可以接受的。
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(s.Data)
關閉通道是向接收器發出的信號,表示通道上不會發送更多值。因此,關閉通道始終是發送者的工作(和責任)。啟動 goroutine 以關閉通道,您將該工作和責任“移交給”另一個不會與發送方同步的“實體”(goroutine)。這可能很容易導致死機(在閉合通道上發送是運行時死機,有關其他公理,請參閱未初始化的通道如何工作?)。別這樣。
是的,這是必要的,因為您啟動了goroutines來發送。如果你不這樣做,那么你可以“就地”關閉,而不啟動goroutine,因為這樣發送者和關閉者將是同一個實體:它本身,其發送和關閉操作受互斥鎖保護。因此,解決第一個問題自然而然地解決了第二個問題。Pubsub
通常,如果一個通道有多個發送方,則必須協調關閉通道。必須有一個實體(通常不是任何發送方)等待所有發送方完成,實際上使用 一個 ,然后該單個實體可以安全地關閉通道。請參閱關閉長度未知的通道。sync.WaitGroup
- 1 回答
- 0 關注
- 78 瀏覽
添加回答
舉報