1 回答

TA貢獻2036條經驗 獲得超8個贊
解決方案:XPUB/XSUB 應該綁定到 socket PUB,SUB worker 應該連接到 socket
下面的工作代碼
package playground
import (
? ? zmq "github.com/pebbe/zmq4"
? ? "fmt"
? ? "log"
? ? "math/rand"
? ? "testing"
? ? "time"
)
func subscriber_thread(id int) {
? ? subscriber, err := zmq.NewSocket(zmq.SUB)
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? err = subscriber.Connect("tcp://localhost:6001")
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? err = subscriber.SetSubscribe("")
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? defer subscriber.Close()
? ? for {
? ? ? ? msg, err := subscriber.RecvMessage(0)
? ? ? ? if err != nil {
? ? ? ? ? ? panic(err)
? ? ? ? }
? ? ? ? fmt.Println("subscriber id:", id, "received:", msg)
? ? }
}
func publisher_thread(n int) {
? ? publisher, err := zmq.NewSocket(zmq.PUB)
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? //err = publisher.Bind("tcp://*:6000")
? ? err = publisher.Connect("tcp://localhost:6000")
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? for {
? ? ? ? s := fmt.Sprintf("%c-%05d", n+'A', rand.Intn(100000))
? ? ? ? _, err := publisher.SendMessage(s)
? ? ? ? if err != nil {
? ? ? ? ? ? panic(err)
? ? ? ? }
? ? ? ? fmt.Println("publisher sent:", s)
? ? ? ? time.Sleep(100 * time.Millisecond) //? Wait for 1/10th second
? ? }
}
//? The listener receives all messages flowing through the proxy, on its
//? pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
//? attached child threads. In other languages your mileage may vary:
func listener_thread() {
? ? pipe, _ := zmq.NewSocket(zmq.PAIR)
? ? pipe.Bind("inproc://pipe")
? ? //? Print everything that arrives on pipe
? ? for {
? ? ? ? msg, err := pipe.RecvMessage(0)
? ? ? ? if err != nil {
? ? ? ? ? ? break //? Interrupted
? ? ? ? }
? ? ? ? fmt.Printf("%q\n", msg)
? ? }
}
func TestZmqEspresso(t *testing.T) {
? ? log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)
? ? go publisher_thread(0)
? ? go publisher_thread(1)
? ? go publisher_thread(2)
? ? go subscriber_thread(1)
? ? go subscriber_thread(2)
? ? go listener_thread()
? ? time.Sleep(100 * time.Millisecond)
? ? subscriber, err := zmq.NewSocket(zmq.XSUB)
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? //err = subscriber.Connect("tcp://localhost:6000")
? ? err = subscriber.Bind("tcp://*:6000")
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? publisher, err := zmq.NewSocket(zmq.XPUB)
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? err = publisher.Bind("tcp://*:6001")
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? listener, _ := zmq.NewSocket(zmq.PAIR)
? ? listener.Connect("inproc://pipe")
? ? err = zmq.Proxy(subscriber, publisher, listener)
? ? if err != nil {
? ? ? ? panic(err)
? ? }
? ? fmt.Println("interrupted")
}
- 1 回答
- 0 關注
- 273 瀏覽
添加回答
舉報