I'm trying to solve for having thousands of IoT devices deployed, all logging events to Azure IoT hub, then being able to read events created by a single deviceid only.
I have been playing with EventProcessorHost
to get something like this working, but so far I can only see a way to read all messages from all devices.
Its not a feasible solution to read all the messages and filter client side as there may be millions of messages.
The major purpose of the Azure IoT Hub is an ingestion of mass events from the devices to the cloud stream pipeline for their analyzing in the real-time manner. The default telemetry path (hot way) is via a built-in Event Hub, where all events are temporary stored in the EH partitions.
Besides that default endpoint (events), there is also capability to route an event message to the custom endpoints based on the rules (conditions).
Note, that the number of custom endpoints is limited to 10 and the number of rules to 100. If this limit is matching your business model, you can very easy to stream 10 devices individually, like is described in the Davis' answer.
However, splitting a telemetry stream pipeline based on the sources (devices) over this limit (10+1), it will require to use additional azure entities (components).
The following picture shows a solution for splitting a telemetry stream pipeline based on the devices using a Pub/Sub push model.
The above solution is based on forwarding the stream events to the Azure Event Grid using a custom topic publisher. The event schema for Event Grid eventing is here.
The Custom Topic Publisher for Event Grid is represented by Azure EventHubTrigger Function, where each stream event is mapped into the Event Grid event message with a subject indicated a registered device.
The Azure Event Grid is a Pub/Sub loosely decoupled model, where the events are delivered to the subscribers based on their subscribed subscriptions. In other words, if there is no match for delivery, the event message is disappeared.
Note, that the capable of Event Grid routing is 10 millions events per second per region. The limit of the number of subscriptions is 1000 per region.
Using the REST Api, the subscription can be dynamically created, updated, deleted, etc.
The following code snippet shows an example of the AF implementation for mapping the stream event to the EG event message. As you can see it is very straightforward implementation:
run.csx:
#r "Newtonsoft.Json"
#r "Microsoft.ServiceBus"
using System.Configuration;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
// reusable client proxy
static HttpClient client = HttpClientHelper.Client(ConfigurationManager.AppSettings["TopicEndpointEventGrid"], ConfigurationManager.AppSettings["aeg-sas-key"]);
// AF
public static async Task Run(EventData ed, TraceWriter log)
{
log.Info($"C# Event Hub trigger function processed a message:{ed.SequenceNumber}");
// fire EventGrid Custom Topic
var egevent = new EventGridEvent()
{
Id = ed.SequenceNumber.ToString(),
Subject = $"/iothub/events/{ed.SystemProperties["iothub-message-source"] ?? "?"}/{ed.SystemProperties["iothub-connection-device-id"] ?? "?"}",
EventType = "telemetryDataInserted",
EventTime = ed.EnqueuedTimeUtc,
Data = new
{
sysproperties = ed.SystemProperties,
properties = ed.Properties,
body = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(ed.GetBytes()))
}
};
await client.PostAsJsonAsync("", new[] { egevent });
}
// helper
class HttpClientHelper
{
public static HttpClient Client(string address, string key)
{
var client = new HttpClient() { BaseAddress = new Uri(address) };
client.DefaultRequestHeaders.Add("aeg-sas-key", key);
return client;
}
}
function.json:
{
"bindings": [
{
"type": "eventHubTrigger",
"name": "ed",
"direction": "in",
"path": "<yourEventHubName>",
"connection": "<yourIoTHUB>",
"consumerGroup": "<yourGroup>",
"cardinality": "many"
}
],
"disabled": false
}
project.json:
{
"frameworks": {
"net46":{
"dependencies": {
"Microsoft.Azure.EventGrid": "1.1.0-preview"
}
}
}
}
Finally, the following screen snippet shows an event grid event message received by AF subscriber for Device1:
If you're ok with Java/Scala, this example shows how to create a client and filter messages by device Id:
https://github.com/Azure/toketi-iothubreact/blob/master/samples-scala/src/main/scala/A_APIUSage/Demo.scala#L266
The underlying client reads all the messages from the hub though.
You could also consider using IoT Hub message routing, more info here:
https://azure.microsoft.com/blog/azure-iot-hub-message-routing-enhances-device-telemetry-and-optimizes-iot-infrastructure-resources
https://azure.microsoft.com/blog/iot-hub-message-routing-now-with-routing-on-message-body