亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

ZMQ 無法接收來自多個發布者的消息

ZMQ 無法接收來自多個發布者的消息

Go
揚帆大魚 2023-05-22 17:19:10
我正在實施 ZMQ 的濃縮咖啡模式。我想連接很多訂閱者 <> 代理 <> 許多發布者但是,代理中的偵聽器僅接收來自一個發布者的消息。因此,訂閱者只能從那個特定的發布者那里接收。我不知道我的代碼有什么問題。package playgroundimport (    zmq "github.com/pebbe/zmq4"    "fmt"    "math/rand"    "time"    "testing")func subscriber_thread(id int) {    subscriber, _ := zmq.NewSocket(zmq.SUB)    subscriber.Connect("tcp://localhost:6001")    subscriber.SetSubscribe("")    defer subscriber.Close()    for {        msg, err := subscriber.RecvMessage(0)        if err != nil {            panic(err)        }        fmt.Println("subscriber id:", id,"received:", msg)    }}func publisher_thread(n int) {    publisher, _ := zmq.NewSocket(zmq.PUB)    publisher.Bind("tcp://*:6000")    for {        s := fmt.Sprintf("%c-%05d", n +'A', rand.Intn(100000))        _, err := publisher.SendMessage(s)        if err != nil {            panic(err)        }        fmt.Println("publisher sent:", s)        time.Sleep(100 * time.Millisecond) //  Wait for 1/10th second    }}//  The listener receives all messages flowing through the proxy, on its//  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects//  attached child threads. In other languages your mileage may vary:func listener_thread() {    pipe, _ := zmq.NewSocket(zmq.PAIR)    pipe.Bind("inproc://pipe")    //  Print everything that arrives on pipe    for {        msg, err := pipe.RecvMessage(0)        if err != nil {            break //  Interrupted        }        fmt.Printf("%q\n", msg)    }}func TestZmqEspresso(t *testing.T) {    go publisher_thread(0)    go publisher_thread(1)    go publisher_thread(2)    go subscriber_thread(1)    go subscriber_thread(2)    go listener_thread()    time.Sleep(100 * time.Millisecond)    subscriber, _ := zmq.NewSocket(zmq.XSUB)    subscriber.Connect("tcp://localhost:6000")    publisher, _ := zmq.NewSocket(zmq.XPUB)    publisher.Bind("tcp://*:6001")    listener, _ := zmq.NewSocket(zmq.PAIR)    listener.Connect("inproc://pipe")    zmq.Proxy(subscriber, publisher, listener)    fmt.Println("interrupted")}
查看完整描述

1 回答

?
慕桂英3389331

TA貢獻2036條經驗 獲得超8個贊

解決方案:XPUB/XSUB 應該綁定到 socket PUB,SUB worker 應該連接到 socket


下面的工作代碼


package playground


import (

? ? zmq "github.com/pebbe/zmq4"


? ? "fmt"

? ? "log"

? ? "math/rand"

? ? "testing"

? ? "time"

)


func subscriber_thread(id int) {

? ? subscriber, err := zmq.NewSocket(zmq.SUB)

? ? if err != nil {

? ? ? ? panic(err)

? ? }

? ? err = subscriber.Connect("tcp://localhost:6001")

? ? if err != nil {

? ? ? ? panic(err)

? ? }

? ? err = subscriber.SetSubscribe("")

? ? if err != nil {

? ? ? ? panic(err)

? ? }

? ? defer subscriber.Close()


? ? for {

? ? ? ? msg, err := subscriber.RecvMessage(0)

? ? ? ? if err != nil {

? ? ? ? ? ? panic(err)

? ? ? ? }

? ? ? ? fmt.Println("subscriber id:", id, "received:", msg)

? ? }

}


func publisher_thread(n int) {

? ? publisher, err := zmq.NewSocket(zmq.PUB)

? ? if err != nil {

? ? ? ? panic(err)

? ? }

? ? //err = publisher.Bind("tcp://*:6000")

? ? err = publisher.Connect("tcp://localhost:6000")

? ? if err != nil {

? ? ? ? panic(err)

? ? }


? ? for {

? ? ? ? s := fmt.Sprintf("%c-%05d", n+'A', rand.Intn(100000))

? ? ? ? _, err := publisher.SendMessage(s)

? ? ? ? if err != nil {

? ? ? ? ? ? panic(err)

? ? ? ? }

? ? ? ? fmt.Println("publisher sent:", s)

? ? ? ? time.Sleep(100 * time.Millisecond) //? Wait for 1/10th second

? ? }

}


//? The listener receives all messages flowing through the proxy, on its

//? pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects

//? attached child threads. In other languages your mileage may vary:


func listener_thread() {

? ? pipe, _ := zmq.NewSocket(zmq.PAIR)

? ? pipe.Bind("inproc://pipe")


? ? //? Print everything that arrives on pipe

? ? for {

? ? ? ? msg, err := pipe.RecvMessage(0)

? ? ? ? if err != nil {

? ? ? ? ? ? break //? Interrupted

? ? ? ? }

? ? ? ? fmt.Printf("%q\n", msg)

? ? }

}


func TestZmqEspresso(t *testing.T) {

? ? log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)


? ? go publisher_thread(0)

? ? go publisher_thread(1)

? ? go publisher_thread(2)


? ? go subscriber_thread(1)

? ? go subscriber_thread(2)


? ? go listener_thread()


? ? time.Sleep(100 * time.Millisecond)


? ? subscriber, err := zmq.NewSocket(zmq.XSUB)

? ? if err != nil {

? ? ? ? panic(err)

? ? }

? ? //err = subscriber.Connect("tcp://localhost:6000")

? ? err = subscriber.Bind("tcp://*:6000")

? ? if err != nil {

? ? ? ? panic(err)

? ? }


? ? publisher, err := zmq.NewSocket(zmq.XPUB)

? ? if err != nil {

? ? ? ? panic(err)

? ? }

? ? err = publisher.Bind("tcp://*:6001")

? ? if err != nil {

? ? ? ? panic(err)

? ? }


? ? listener, _ := zmq.NewSocket(zmq.PAIR)

? ? listener.Connect("inproc://pipe")


? ? err = zmq.Proxy(subscriber, publisher, listener)

? ? if err != nil {

? ? ? ? panic(err)

? ? }


? ? fmt.Println("interrupted")


}


查看完整回答
反對 回復 2023-05-22
  • 1 回答
  • 0 關注
  • 273 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號