3 回答

TA貢獻1871條經驗 獲得超8個贊
自從其他兩個答案(盡管信息非常豐富,實際上是錯誤的)以來,我發布了一個快速更新,并且我不希望其他人從我接受的答案中得到誤導。您不僅可以使用 zmq 做到這一點,它實際上是默認行為。
訣竅是,如果您的發布者客戶端在不斷丟棄消息之前從未連接到訂閱者服務器(這就是為什么我認為它不緩沖消息),但是如果您的發布者連接到訂閱者并且您重新啟動訂閱者,則發布者將緩沖消息直到達到 HWM,這正是我所要求的...所以簡而言之,發布者想知道另一端是否有人接受消息,只有在此之后它才會緩沖消息...
這是一些示例代碼,演示了這一點(您可能需要進行一些基本編輯才能編譯它)。
我只使用了這個依賴項org.zeromq:jeromq:0.5.1。
zmq-publisher.kt
fun main() {
val uri = "tcp://localhost:3006"
val context = ZContext(1)
val socket = context.createSocket(SocketType.PUB)
socket.hwm = 10000
socket.linger = 0
"connecting to $uri".log()
socket.connect(uri)
fun publish(path: String, msg: Msg) {
">> $path | ${msg.json()}".log()
socket.sendMore(path)
socket.send(msg.toByteArray())
}
var count = 0
while (notInterrupted()) {
val msg = telegramMessage("message : ${++count}")
publish("/some/feed", msg)
println()
sleepInterruptible(1.second)
}
}
而且當然zmq-subscriber.kt
fun main() {
val uri = "tcp://localhost:3006"
val context = ZContext(1)
val socket = context.createSocket(SocketType.SUB)
socket.hwm = 10000
socket.receiveTimeOut = 250
"connecting to $uri".log()
socket.bind(uri)
socket.subscribe("/some/feed")
while (true) {
val path = socket.recvStr() ?: continue
val bytes = socket.recv()
val msg = Msg.parseFrom(bytes)
"<< $path | ${msg.json()}".log()
}
}
嘗試先在沒有訂閱者的情況下運行發布者,然后當您啟動訂閱者時,您錯過了到目前為止的所有消息...現在不重新啟動發布者,停止訂閱者等待一段時間并再次啟動它。
這是我的一項服務實際上從中受益的示例......這是結構[current service]sub:server <= pub:client[service being restarted]sub:server <=* pub:client[multiple publishers]
因為我在中間重新啟動服務,所有發布者開始緩沖他們的消息,每秒觀察約 200 條消息的最終服務觀察到下降到 0(那些 1 或 2 是心跳),然后突然爆發 1000 多條消息,因為所有發布者都刷新了他們的緩沖區(重新啟動大約需要 5 秒)...實際上我在這里沒有丟失任何消息...
請注意,您必須有subscriber:server <= publisher:client
一對(這樣發布者就知道“我需要將這些消息傳遞到一個地方”(您可以嘗試綁定發布者并連接訂閱者,但您將不會再看到發布者緩沖消息,因為它是有問題的,如果剛剛斷開連接的訂閱者這樣做是因為它不再需要數據或因為它失敗)

TA貢獻1830條經驗 獲得超9個贊
首先,歡迎來到 Zen-of-Zero 的世界,其中延遲最為重要
序言:
ZeroMQ 是由 Pieter HINTJENS 的團隊設計的,該團隊由經驗豐富的大師組成,Martin SUSTRIK 名列第一。該設計經過專業設計,以避免任何不必要的延遲。那么詢問是否有(有限的)持久性?不,先生,尚未確認 -PUB/SUB
可擴展的正式通信模式原型不會內置它,因為增加了問題并降低了性能和可擴展性(附加延遲、附加處理、附加內存管理)。
如果需要(有限的)持久性(對于缺少遠程 SUB 端代理的連接),請隨意在應用程序端實現它,或者可以設計和實現一種新的 ZMTP 兼容的此類行為模式原型,擴展 ZeroMQ 框架,如果此類工作進入穩定且公開接受的狀態,但不要求高性能、消除延遲的標準已經打磨了PUB/SUB
幾乎線性的可擴展性 ad astra,以朝這個方向進行修改。這絕對不是一條路可走。
解決方案 ?
應用程序端可以使用雙指針循環緩沖區輕松實現您添加的邏輯,以某種(應用程序端管理)的方式工作-?Persistence-PROXY,但位于PUB
發送方的前面。
如果您的設計還喜歡使用最近提供的內置 ZeroMQ 組件來設置額外socket_monitor
的控制層并在那里接收事件流(如圖所示),那么您的設計可能會成功地從 ZeroMQ 內部細節中擠出一些額外的醬料從 PUB 端Context
實例“內部”,其中一些額外的網絡和連接管理相關事件可能會給您的(應用程序端管理)帶來更多信息-持久性代理
然而,請注意
該
_zmq_socket_monitor()_
方法僅支持面向連接的傳輸,即TCP、IPC和TIPC。
因此,如果計劃使用任何最終有趣的傳輸類,人們可能會直接忘記這一點{ inproc:// | norm:// | pgm:// | epgm:// | vmci:// }
小心 !
“...zmq 優化主題發布?就像如果您持續
topic
快速發布大約 100 個字符長的內容,它實際上是topic
每次都發送還是映射到某個 int 并隨后發送 int...?”
告訴你:
“它總是會發布
topic.
當我使用該pub-sub
模式時,我通常發布topic
第一個消息,然后發布實際消息,因此在訂閱者中,我只讀取第一幀并忽略它,然后讀取實際消息”
ZeroMQ 不是這樣工作的。沒有什么是“單獨”<topic>
后面跟著一個<message-body>
,而是相反
主題過濾TOPIC
的機械化工作方式非常不同。
1)你永遠不知道,誰.connect()
-s:
即幾乎可以肯定版本 2.x 直到版本 4.2+ 將以不同的方式處理主題過濾(ZMTP:RFC 定義初始功能版本握手,以讓Context
-instance決定必須使用哪個版本的主題過濾:
版本 2.x用于將所有消息移動到所有對等方,并讓所有SUB 端(版本 2.x+)傳遞消息(并讓-?SUB
side?Context
-instance 處理本地topic
-list 過濾處理)
,而
4.2+ 版本肯定會topic
在 **PUB-side?Context
-instance 上執行 -list 過濾處理(CPU 使用量增長,網絡傳輸相反),所以你的 SUB-一方永遠不會傳遞一個字節的“無用”讀取“未訂閱”的消息。
2)(您可以,但是)不需要將“主題”分離到由此暗示的多幀消息的第一幀中。也許恰恰相反(在高性能、低延遲的分布式系統設計中這樣做是一種相當反模式。
主題過濾過程被定義并按字節工作,從左到右,每個主題列表成員值與傳遞的消息有效負載進行模式匹配。
添加額外的數據、額外的幀管理處理只會增加端到端延遲和處理開銷。這樣做從來都不是一個好主意,而不是正確的分布式系統設計工作。
尾聲:
職業比賽中沒有輕而易舉的勝利,也沒有唾手可得的成果分布式系統設計,如果越少低延遲或超低延遲是設計目標。
另一方面,請確保 ZeroMQ 框架是在考慮到這一點的情況下制定的,并且這些努力以穩定的、最終性能良好平衡的工具集為頂峰,這些工具集智能(設計)、快速(運行)和可擴展(因為地獄可能會這樣)由于這種設計智慧,人們喜歡使用正確的信號/消息服務。
希望您對 ZeroMQ 感到滿意,并隨意在您選擇的應用程序套件內的 ZeroMQ 層“前面”添加任何附加功能集。

TA貢獻1815條經驗 獲得超10個贊
發布者無法在沒有任何連接的情況下緩沖消息,它只會刪除任何新消息:
來自文檔:
如果發布者沒有連接的訂閱者,那么它將簡單地刪除所有消息。
這意味著您的緩沖區需要不受 Zeromq 的關注。您的緩沖區可以是列表、數據庫或您選擇的任何其他存儲方法,但您不能使用發布者來執行此操作。
現在下一個問題是處理如何檢測訂閱者已連接/斷開連接。這需要告訴我們何時需要開始從緩沖區讀取/填充緩沖區。
我建議使用Socket.monitor
和監聽 和ZMQ_EVENT_CONNECTED
,ZMQ_EVENT_DISCONNECTED
因為它們會告訴您客戶端何時連接/斷開連接,從而使您能夠切換到填充您選擇的緩沖區。當然,可能還有其他不直接涉及 Zeromq 的方法,但這由您決定。
添加回答
舉報