亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Google Pub/Sub 消息排序不起作用(或將延遲增加到 10 秒以上)?

Google Pub/Sub 消息排序不起作用(或將延遲增加到 10 秒以上)?

Go
慕的地8271018 2022-06-21 16:46:11
我正在嘗試制作一個簡化示例來演示如何使用 Google Pub/Sub 的消息排序功能 ( https://cloud.google.com/pubsub/docs/ordering )。從這些文檔中,為訂閱啟用消息排序后,設置消息排序屬性后,Pub/Sub 服務會按照 Pub/Sub 服務接收消息的順序下發具有相同 ordering key 的消息。例如,如果發布者使用相同的排序鍵發送兩條消息,則 Pub/Sub 服務首先傳遞最舊的消息。我用它來編寫以下示例:package mainimport (    "context"    "log"    "time"    "cloud.google.com/go/pubsub"    uuid "github.com/satori/go.uuid")func main() {    client, err := pubsub.NewClient(context.Background(), "my-project")    if err != nil {        log.Fatalf("NewClient: %v", err)    }    topicID := "test-topic-" + uuid.NewV4().String()    topic, err := client.CreateTopic(context.Background(), topicID)    if err != nil {        log.Fatalf("CreateTopic: %v", err)    }    defer topic.Delete(context.Background())    subID := "test-subscription-" + uuid.NewV4().String()    sub, err := client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{        Topic:                 topic,        EnableMessageOrdering: true,    })    if err != nil {        log.Fatalf("CreateSubscription: %v", err)    }    defer sub.Delete(context.Background())    ctx, cancel := context.WithCancel(context.Background())    defer cancel()    messageReceived := make(chan struct{})    go sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {        log.Printf("Received message with ordering key %s: %s", msg.OrderingKey, msg.Data)        msg.Ack()        messageReceived <- struct{}{}    })首先,我嘗試了該程序,但沒有OrderingKey: "foobar"在topic.Publish()調用中指定。這導致以下輸出:> go run main.go2020/08/10 21:40:34 Received message with ordering key : Dang2!2020/08/10 21:40:34 Received message with ordering key : Dang1!換句話說,消息的接收順序與發布的順序不同,這在我的用例中是不可取的,我想通過指定OrderingKey但是,一旦我OrderingKey在發布調用中添加了 s,程序在等待接收 Pub/Sub 消息 10 秒后就會超時:> go run main.go2020/08/10 21:44:36 Expected to receive a message, but timed out after 10 seconds.exit status 1我期望現在首先收到消息,Dang1!然后是Dang2!,但我沒有收到任何消息。知道為什么這沒有發生嗎?
查看完整描述

2 回答

?
大話西游666

TA貢獻1817條經驗 獲得超14個贊

發布失敗并出現以下錯誤:Failed to publish: Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering。


如果您更改發布調用以檢查錯誤,您可以看到這一點:


res1 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})

res2 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})


_, err = res1.Get(ctx)

if err != nil {

    fmt.Printf("Failed to publish: %v", err)

    return

}


_, err = res2.Get(ctx)

if err != nil {

    fmt.Printf("Failed to publish: %v", err)

    return

}

要修復它,請添加一行以啟用主題的消息排序。您的主題創建如下:


topic, err := client.CreateTopic(context.Background(), topicID)

if err != nil {

    log.Fatalf("CreateTopic: %v", err)

}

topic.EnableMessageOrdering = true

defer topic.Delete(context.Background())


查看完整回答
反對 回復 2022-06-21
?
蕭十郎

TA貢獻1815條經驗 獲得超13個贊

我獨立想出了與 Kamal 相同的解決方案,只是想分享完整的修改后的實現:


package main


import (

    "context"

    "flag"

    "log"

    "time"


    "cloud.google.com/go/pubsub"

    uuid "github.com/satori/go.uuid"

)


var enableMessageOrdering bool


func main() {

    flag.BoolVar(&enableMessageOrdering, "enableMessageOrdering", false, "Enable and use Pub/Sub message ordering")

    flag.Parse()


    client, err := pubsub.NewClient(context.Background(), "fleetsmith-dev")

    if err != nil {

        log.Fatalf("NewClient: %v", err)

    }


    topicID := "test-topic-" + uuid.NewV4().String()

    topic, err := client.CreateTopic(context.Background(), topicID)

    if err != nil {

        log.Fatalf("CreateTopic: %v", err)

    }

    topic.EnableMessageOrdering = enableMessageOrdering

    defer topic.Delete(context.Background())


    subID := "test-subscription-" + uuid.NewV4().String()

    sub, err := client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{

        Topic:                 topic,

        EnableMessageOrdering: enableMessageOrdering,

    })

    if err != nil {

        log.Fatalf("CreateSubscription: %v", err)

    }

    defer sub.Delete(context.Background())


    ctx, cancel := context.WithCancel(context.Background())

    defer cancel()


    messageReceived := make(chan struct{})

    go sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {

        log.Printf("Received message with ordering key %s: %s", msg.OrderingKey, msg.Data)

        msg.Ack()

        messageReceived <- struct{}{}

    })


    msg1, msg2 := &pubsub.Message{Data: []byte("Dang1!")}, &pubsub.Message{Data: []byte("Dang2!")}

    if enableMessageOrdering {

        msg1.OrderingKey, msg2.OrderingKey = "foobar", "foobar"

    }

    publishMessage(topic, msg1)

    publishMessage(topic, msg2)


    for i := 0; i < 2; i++ {

        select {

        case <-messageReceived:

        case <-time.After(10 * time.Second):

            log.Fatal("Expected to receive a message, but timed out after 10 seconds.")

        }

    }

}


func publishMessage(topic *pubsub.Topic, msg *pubsub.Message) {

    publishResult := topic.Publish(context.Background(), msg)

    messageID, err := publishResult.Get(context.Background())

    if err != nil {

        log.Fatalf("Get: %v", err)

    }

    log.Printf("Published message with ID %s", messageID)

}

當enableMessageOrdering標志設置為調用時true,我首先收到Dang1!,然后是Dang2!:


> go run main.go --enableMessageOrdering

2020/08/11 05:38:07 Published message with ID 1420685949616723

2020/08/11 05:38:08 Published message with ID 1420726763302425

2020/08/11 05:38:09 Received message with ordering key foobar: Dang1!

2020/08/11 05:38:11 Received message with ordering key foobar: Dang2!

而沒有它,我會像以前一樣以相反的順序收到它們:


> go run main.go

2020/08/11 05:38:47 Published message with ID 1420687395091051

2020/08/11 05:38:47 Published message with ID 1420693737065665

2020/08/11 05:38:48 Received message with ordering key : Dang2!

2020/08/11 05:38:48 Received message with ordering key : Dang1!


查看完整回答
反對 回復 2022-06-21
  • 2 回答
  • 0 關注
  • 152 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號