2 回答

TA貢獻1802條經驗 獲得超5個贊
有兩種方法可以將數據發送到 Azure 事件中心,即 HTTP REST API 和 AMQP 1.0 協議。
對于使用 HTTP REST API 或Azure EventHub Python 客戶端庫,只有partitionId
參數支持向 Event Hub 中的指定分區發送新事件,如下兩個。
REST API
Send partition event
需要partitionId
endpoint中的參數https://{servicebusNamespace}.servicebus.windows.net/{eventHubPath}/partitions/{partitionId}/messages
,它是唯一一個支持發送分區特性的REST API。的源代碼注釋
Sender.py
解釋partition
參數如下。:param partition: The specific partition ID to send to. Default is None, in which case the service will assign to all partitions using round-robin.:type partition: str
所以實際上,您不能使用partitionKey
value 將事件發送到指定的 EventHub 分區,除非在 Python 中使用 AMQP 1.0。使用AMQP 1.0,請查看官方文檔AMQP 1.0 in Azure Service Bus and Event Hubs protocol guide
并partition-key
在頁面上搜索,結果如下。

TA貢獻1815條經驗 獲得超13個贊
我不太確定,但使用 python ,這是打開連接的方法
def open(self):
"""
Open the Sender using the supplied conneciton.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.
:param connection: The underlying client shared connection.
:type: connection: ~uamqp.connection.Connection
"""
self.running = True
if self.redirected:
self.target = self.redirected.address
self._handler = SendClient(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
這是初始化
def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True):
"""
Instantiate an EventHub event Sender handler.
:param client: The parent EventHubClient.
:type client: ~azure.eventhub.client.EventHubClient.
:param target: The URI of the EventHub to send to.
:type target: str
:param partition: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
:type partition: str
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is None, i.e. no keep alive pings.
:type keep_alive: int
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
"""
self.running = False
self.client = client
self.target = target
self.partition = partition
self.timeout = send_timeout
self.redirected = None
self.error = None
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.name = "EHSender-{}".format(uuid.uuid4())
if partition:
self.target += "/Partitions/" + partition
self.name += "-partition{}".format(partition)
self._handler = SendClient(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self._outcome = None
self._condition = None
我相信,下面的函數行只會創建一個分區發送者
if partition:
self.target += "/Partitions/" + partition
self.name += "-partition{}".format(partition)
參考
https://github.com/Azure/azure-event-hubs-python/blob/master/azure/eventhub/sender.py
希望能幫助到你。
添加回答
舉報