2 回答

TA貢獻1817條經驗 獲得超14個贊
發布失敗并出現以下錯誤:Failed to publish: Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering。
如果您更改發布調用以檢查錯誤,您可以看到這一點:
res1 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
res2 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})
_, err = res1.Get(ctx)
if err != nil {
fmt.Printf("Failed to publish: %v", err)
return
}
_, err = res2.Get(ctx)
if err != nil {
fmt.Printf("Failed to publish: %v", err)
return
}
要修復它,請添加一行以啟用主題的消息排序。您的主題創建如下:
topic, err := client.CreateTopic(context.Background(), topicID)
if err != nil {
log.Fatalf("CreateTopic: %v", err)
}
topic.EnableMessageOrdering = true
defer topic.Delete(context.Background())

TA貢獻1815條經驗 獲得超13個贊
我獨立想出了與 Kamal 相同的解決方案,只是想分享完整的修改后的實現:
package main
import (
"context"
"flag"
"log"
"time"
"cloud.google.com/go/pubsub"
uuid "github.com/satori/go.uuid"
)
var enableMessageOrdering bool
func main() {
flag.BoolVar(&enableMessageOrdering, "enableMessageOrdering", false, "Enable and use Pub/Sub message ordering")
flag.Parse()
client, err := pubsub.NewClient(context.Background(), "fleetsmith-dev")
if err != nil {
log.Fatalf("NewClient: %v", err)
}
topicID := "test-topic-" + uuid.NewV4().String()
topic, err := client.CreateTopic(context.Background(), topicID)
if err != nil {
log.Fatalf("CreateTopic: %v", err)
}
topic.EnableMessageOrdering = enableMessageOrdering
defer topic.Delete(context.Background())
subID := "test-subscription-" + uuid.NewV4().String()
sub, err := client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: enableMessageOrdering,
})
if err != nil {
log.Fatalf("CreateSubscription: %v", err)
}
defer sub.Delete(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
messageReceived := make(chan struct{})
go sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
log.Printf("Received message with ordering key %s: %s", msg.OrderingKey, msg.Data)
msg.Ack()
messageReceived <- struct{}{}
})
msg1, msg2 := &pubsub.Message{Data: []byte("Dang1!")}, &pubsub.Message{Data: []byte("Dang2!")}
if enableMessageOrdering {
msg1.OrderingKey, msg2.OrderingKey = "foobar", "foobar"
}
publishMessage(topic, msg1)
publishMessage(topic, msg2)
for i := 0; i < 2; i++ {
select {
case <-messageReceived:
case <-time.After(10 * time.Second):
log.Fatal("Expected to receive a message, but timed out after 10 seconds.")
}
}
}
func publishMessage(topic *pubsub.Topic, msg *pubsub.Message) {
publishResult := topic.Publish(context.Background(), msg)
messageID, err := publishResult.Get(context.Background())
if err != nil {
log.Fatalf("Get: %v", err)
}
log.Printf("Published message with ID %s", messageID)
}
當enableMessageOrdering標志設置為調用時true,我首先收到Dang1!,然后是Dang2!:
> go run main.go --enableMessageOrdering
2020/08/11 05:38:07 Published message with ID 1420685949616723
2020/08/11 05:38:08 Published message with ID 1420726763302425
2020/08/11 05:38:09 Received message with ordering key foobar: Dang1!
2020/08/11 05:38:11 Received message with ordering key foobar: Dang2!
而沒有它,我會像以前一樣以相反的順序收到它們:
> go run main.go
2020/08/11 05:38:47 Published message with ID 1420687395091051
2020/08/11 05:38:47 Published message with ID 1420693737065665
2020/08/11 05:38:48 Received message with ordering key : Dang2!
2020/08/11 05:38:48 Received message with ordering key : Dang1!
- 2 回答
- 0 關注
- 152 瀏覽
添加回答
舉報