I'm trying to send messages to Azure Event Hub using python and the rest API after some failed experiments i have found working code (see below) but i want to be able to select to which partition to send the event.
Is this possible using the rest API and if so how should be done?
import json
from datetime import datetime
from multiprocessing import Pool
# from azure.servicebus import _service_bus_error_handler
from azure.servicebus.servicebusservice import ServiceBusService, ServiceBusSASAuthentication
from azure.http import (
from azure.http.httpclient import _HTTPClient
EVENT_HUB_HOST = "mysecrethub.servicebus.windows.net"
EVENT_HUB_NAME = "secerthub-name"
KEYNAME = "senderkey" # needs to be loaded from ENV
KEYVALUE = "keyvalue" # needs to be loaded from ENV
class EventHubClient(object):
def __init__(self, host, hubname, keyname, keyvalue):
self._host = host
self._hub = hubname
self._keyname = keyname
self._key = keyvalue
def sendMessage(self, body, partition=None, additional_headers=None):
eventHubHost = self._host
httpclient = _HTTPClient(service_instance=self)
sasKeyName = self._keyname
sasKeyValue = self._key
authentication = ServiceBusSASAuthentication(sasKeyName, sasKeyValue)
request = HTTPRequest()
request.method = "POST"
request.host = eventHubHost
request.protocol_override = "https"
request.path = "/%s/messages?api-version=2014-01" % (self._hub)
request.body = body
request.headers.append(('Content-Type', 'application/atom+xml;type=entry;charset=utf-8'))
if additional_headers is not None:
for item in additional_headers:
if partition is not None:
value = json.dumps({'PartitionKey': partition})
request.headers.append(('BrokerProperties', value))
authentication.sign_request(request, httpclient)
request.headers.append(('Content-Length', str(len(request.body))))
status = 0
resp = httpclient.perform_request(request)
status = resp.status
except HTTPError as ex:
status = ex.status
# print request.headers
return status
def prepare_message(appid, sessionid, partitionKey=None, SessionEllapsed=None, DeviceOs=None):
message = {"Name": "MonitorEvent"}
Attributes = {"AppId": appid, "SessionStarted": "".join(str(datetime.now())[:-3])}
if SessionEllapsed is not None:
Attributes['SessionEllapsed'] = SessionEllapsed
if DeviceOs is not None:
Attributes['DeviceOs'] = DeviceOs
if partitionKey is not None:
message["PartitionKey"] = str(partitionKey)
message["PartitionId"] = str(partitionKey)
Attributes['ItemId'] = partitionKey
message['Attributes'] = Attributes
return json.dumps(message)
def send_monitoring_event(partition):
appid = 1
sendertime = datetime.now().strftime('%Y%M%d-%H%M%S')
message = prepare_message(appid, sendertime, partitionKey=partition, SessionEllapsed=1, DeviceOs='Monitor' + str(partition))
# print message
hubStatus = hubClient.sendMessage(message, partition=None, additional_headers=EXTRA_HEADERS)
# return the HTTP status to the caller
return hubStatus
def main():
pool = Pool(processes=NUM_OF_PARTITIONS)
print pool.map(send_monitoring_event, range(NUM_OF_PARTITIONS))
if __name__ == '__main__':
Following the section 'Send Event' of Event Hubs REST APIs docunment https://msdn.microsoft.com/en-us/library/azure/dn790664.aspx, you can't use the Request URI https://{serviceNamespace}.servicebus.windows.net/{eventHubPath}/messages to select to which partition to send events.
You should use the Request URI https://{serviceNamespace}.servicebus.windows.net/{eventHubPath}/publishers/{deviceId}/messages. The attribute {deviceId} is partition key what used to group/partition devices—whether it is geo-location, device type, version, tenant, and so on.
But the partition count must be a number between 2 and 32. So if you need to use more than 32 partitions, I suggest to put the key into the event data.
Best Regards.