1 回答

TA貢獻1842條經驗 獲得超13個贊
在開始討論之前,我想強調一下 Azure 服務總線和 Celery 之間的差異。
Azure服務總線:
Microsoft Azure 服務總線是完全托管的企業集成消息代理。
芹菜 :
分布式任務隊列。Celery是一個基于分布式消息傳遞的異步任務隊列/作業隊列。
對于你的情況我可以想到兩種可能性:
您希望將 Service Bus 與 Celery 一起使用來代替其他消息代理。
用服務總線替換 Celery
1:您希望將 Service Bus 與 Celery 一起使用來代替其他消息代理。
2:完全用Service Bus替換Celery?以滿足您的要求:
考慮
消息發送者是生產者
消息接收者是消費者
這是您必須處理的兩個不同的應用程序。
解釋 :
每次您想要執行操作時,您都可以從生產者客戶端向主題發送消息。
消費者客戶端 - 正在偵聽的應用程序將接收消息并處理該消息。您可以將自定義流程附加到它 - 這樣,只要消費者客戶端收到消息,您的自定義流程就會執行。
以下是接收客戶端的示例:
from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()
? ? ? ??
Receiving = True
#Topic 1 receiver :?
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver =? SubsClient.get_receiver()
async def receive_message_from1():
? ? await receiver.open()
? ? print("Opening the Receiver for Topic1")
? ? async with receiver:
? ? ? while(Receiving):
? ? ? ? msgs =? await receiver.fetch_next()
? ? ? ? for m in msgs:
? ? ? ? ? ? print("Received the message from topic 1.....")
? ? ? ? ? ? ##### - Your code to execute when a message is received - ########
? ? ? ? ? ? print(str(m))
? ? ? ? ? ? ##### - Your code to execute when a message is received - ########
? ? ? ? ? ? await m.complete()
? ? ? ? ? ??
? ? ? ? ? ??
loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
下面一行之間的部分是每次收到消息時將執行的指令。
##### - Your code to execute when a message is received - ########
添加回答
舉報