Event Routing
AgeDigitalTwins supports real-time event routing to various external systems when digital twins, relationships, models are created, updated, or deleted, and when telemetry data is published. This enables you to build reactive systems, data pipelines, and analytics solutions.
Overview
Event routing in AgeDigitalTwins works similarly to Azure Digital Twins, with the following key features:
- Real-time streaming: Events are captured and routed in near real-time using PostgreSQL logical replication and NOTIFY/LISTEN
- CloudEvents format: All events conform to the CloudEvents specification
- Multiple sinks: Route events to Kafka, Azure Data Explorer (Kusto), MQTT, and more
- Event filtering: Configure which events are routed to which sinks
- Dual event types: Lifecycle events (durable) and telemetry events (real-time)
Event Types
AgeDigitalTwins generates several types of events:
Lifecycle Events
Captured through PostgreSQL logical replication (durable and replicated):
- Twin Events:
TwinCreate
,TwinUpdate
,TwinDelete
- Relationship Events:
RelationshipCreate
,RelationshipUpdate
,RelationshipDelete
- Model Events:
ModelCreate
,ModelUpdate
,ModelDelete
Telemetry Events
Published through PostgreSQL NOTIFY/LISTEN (real-time, ephemeral):
- Twin Telemetry:
Telemetry
- General telemetry data for a twin - Component Telemetry:
ComponentTelemetry
- Component-specific telemetry data
Event Architecture
The event system uses a dual approach for optimal performance:
This architecture ensures:
- Lifecycle events are durable and never lost (important for maintaining state consistency)
- Telemetry events are delivered with minimal latency (important for real-time monitoring)
- Unified processing through a shared event queue and consumer
Supported Event Sinks
Kafka / Azure Event Hubs
Stream events to Apache Kafka or Azure Event Hubs for real-time processing and integration with downstream systems. Perfect for:
- Real-time analytics and stream processing
- Integration with microservices architectures
- Event-driven workflows
- High-throughput scenarios
Azure Data Explorer (Kusto)
Send events directly to Azure Data Explorer for analytics and time-series analysis, bypassing the need for intermediate Event Hubs. Ideal for:
- Historical data analysis and reporting
- Time-series analytics on telemetry data
- Data exploration and visualization
- Long-term data retention
MQTT
Route events to MQTT brokers for lightweight messaging and IoT scenarios. Uses structured CloudEvents format over MQTT. Great for:
- IoT device integration
- Lightweight pub/sub messaging
- Edge computing scenarios
- Mobile and connected device applications
Event Routing Configuration
Basic Event Routes
Define which events go to which sinks using event routes:
eventRoutes:
# Route all lifecycle events to Kusto for analytics
- eventType: TwinCreate
sink: AdxSink
- eventType: TwinUpdate
sink: AdxSink
- eventType: TwinDelete
sink: AdxSink
# Route telemetry to Kafka for real-time processing
- eventType: Telemetry
sink: KafkaSink
- eventType: ComponentTelemetry
sink: KafkaSink
# Route all events to MQTT for IoT integration
- eventType: "*"
sink: MqttSink
Advanced Routing Patterns
You can create multiple routes for the same event type to send to different sinks:
eventRoutes:
# Send telemetry to both real-time and analytical sinks
- eventType: Telemetry
sink: KafkaSink # For real-time processing
- eventType: Telemetry
sink: AdxSink # For historical analysis
# Route component telemetry differently based on use case
- eventType: ComponentTelemetry
sink: InfluxDbSink # Time-series database
- eventType: ComponentTelemetry
sink: MqttSink # IoT device notifications
Configuration
Configure event routing in your application settings. Here's the structure based on your example:
events:
config:
eventSinks:
kafka:
- name: KafkaSink
brokerList: your-eventhub-namespace.servicebus.windows.net
topic: your-eventhub-name
saslMechanism: OAUTHBEARER
kusto:
- name: AdxSink
ingestionUri: https://ingest-your-cluster.region.kusto.windows.net
database: your-database
mqtt:
- name: MqttSink
broker: your-mqtt-broker-host
port: 1883
topic: digitaltwins/events
clientId: agedt-client
username: your-username
password: your-password
Federated Authentication for Event Sinks (Kusto & Event Hubs)
AgeDigitalTwins uses DefaultAzureCredential
for authenticating to Azure services like Kusto (ADX) and Event Hubs (via Kafka). For secure, automated integration, we recommend using Azure AD Workload Identity Federation (OIDC) with a service principal.
1. Create a Service Principal in Your Azure Tenant
You (the customer) should create a service principal (app registration) in your own Azure AD tenant:
az ad app create --display-name "AgeDigitalTwins Event Sink"
az ad sp create --id <appId>
Or via the portal:
- Go to Azure Active Directory → App registrations → New registration
2. Federate the Service Principal with AgeDigitalTwins
We will provide you with the OIDC issuer URL for our cluster (e.g., https://<your-agedt-cluster>/oidc
).
Add a federated credential to your app registration:
- Go to your app registration in Azure Portal
- Select Certificates & secrets → Federated credentials → Add credential
- Set the Issuer to the OIDC issuer URL we provide
- Set the Subject to the workload identity you want to allow (e.g.,
system:serviceaccount:<namespace>:<serviceaccount>
) - Save
3. Assign Roles to the Service Principal
- For Kusto: Assign the service principal the
Contributor
orIngestor
role on your Kusto cluster/database. - For Event Hubs: Assign the service principal the
Azure Event Hubs Data Sender
role on the Event Hub namespace.
Example (Kusto):
az kusto database-principal-assignment create \
--cluster-name <cluster> \
--database-name <db> \
--principal-id <servicePrincipalObjectId> \
--role Ingestor \
--principal-type App
Example (Event Hubs):
az role assignment create \
--assignee <servicePrincipalObjectId> \
--role "Azure Event Hubs Data Sender" \
--scope /subscriptions/<sub>/resourceGroups/<rg>/providers/Microsoft.EventHub/namespaces/<namespace>
4. Configure AgeDigitalTwins
No secrets or credentials need to be stored in AgeDigitalTwins. The platform will use the federated identity to obtain tokens via DefaultAzureCredential
.
Example event sink config:
eventSinks:
kusto:
- name: AdxSink
ingestionUri: https://ingest-your-cluster.region.kusto.windows.net
database: your-database
kafka:
- name: KafkaSink
brokerList: your-eventhub-namespace.servicebus.windows.net
topic: your-eventhub-name
saslMechanism: OAUTHBEARER
Event Routes
You can define event routes to control which events go to which sinks. Example:
eventRoutes:
- eventType: TwinCreate
sink: AdxSink
- eventType: TwinUpdate
sink: KafkaSink
Migration Note
If you previously used managed identities or connection strings in Azure Digital Twins, you now use federated credentials for secure, passwordless authentication.
CloudEvents Format
All events in AgeDigitalTwins follow the CloudEvents specification, providing a standardized format for event data across different systems.
Event Structure
Every event includes these standard CloudEvents attributes:
specversion
: CloudEvents specification version (always "1.0")type
: Event type (e.g., "Konnektr.DigitalTwins.Twin.Create")source
: Source URI of the AgeDigitalTwins instanceid
: Unique identifier for the eventtime
: Timestamp when the event occurredsubject
: The entity that the event is about (twin ID, relationship ID, etc.)datacontenttype
: Content type of the event data (always "application/json")data
: The actual event payload
Event Types
Event Type | Description | Trigger |
---|---|---|
Konnektr.DigitalTwins.Twin.Create | Digital twin created | Twin creation via API |
Konnektr.DigitalTwins.Twin.Update | Digital twin updated | Twin or component updates |
Konnektr.DigitalTwins.Twin.Delete | Digital twin deleted | Twin deletion via API |
Konnektr.DigitalTwins.Relationship.Create | Relationship created | Relationship creation |
Konnektr.DigitalTwins.Relationship.Update | Relationship updated | Relationship property updates |
Konnektr.DigitalTwins.Relationship.Delete | Relationship deleted | Relationship deletion |
Konnektr.DigitalTwins.Telemetry | Telemetry published | Twin telemetry publishing |
Konnektr.DigitalTwins.Component.Telemetry | Component telemetry | Component telemetry publishing |
For detailed event examples and payload structures, see:
Event Routing Configuration
Kafka Configuration
kafka:
- name: KafkaSink
brokerList: your-kafka-broker:9092 # Port 9093 will be auto-appended if not specified
topic: digitaltwins-events
saslMechanism: PLAIN # or OAUTHBEARER for Azure Event Hubs
saslUsername: your-username # Required for PLAIN authentication
saslPassword: your-password # Required for PLAIN authentication
Notes:
- For Azure Event Hubs, use
OAUTHBEARER
with Azure credentials - Port 9093 is automatically appended if not specified in
brokerList
- SASL/SSL is automatically configured for secure communication
Kusto Configuration
kusto:
- name: AdxSink
ingestionUri: https://ingest-your-cluster.region.kusto.windows.net
database: your-database
propertyEventsTable: AdtPropertyEvents # Default table name
twinLifeCycleEventsTable: AdtTwinLifeCycleEvents # Default table name
relationshipLifeCycleEventsTable: AdtRelationshipLifeCycleEvents # Default table name
Notes:
- Uses queued ingestion for optimal performance
- Automatically creates JSON ingestion mappings for each event type
- Table names are optional and will use defaults if not specified
MQTT Configuration
mqtt:
- name: MqttSink
broker: your-mqtt-broker-host
port: 1883 # or 8883 for TLS
topic: digitaltwins/events
clientId: agedt-client-001
username: your-username
password: your-password
protocolVersion: "5.0.0" # Supports 3.1.0, 3.1.1, or 5.0.0
Notes:
- Uses structured CloudEvents format for MQTT messages
- Supports MQTT v3.1.0, v3.1.1, and v5.0.0
- Automatic reconnection on connection loss
Performance and Reliability
- Batching: Events are processed in configurable batches (default 50 events) for optimal performance
- PostgreSQL Logical Replication: Uses PostgreSQL's built-in logical replication for real-time event capture
- Replication Slots: Managed replication slots ensure no event loss during restarts or failovers
- Connection Management: Enhanced timeout settings and automatic reconnection for high-load scenarios
- Health Monitoring: Built-in health checks monitor replication connection status via
IsHealthy
property - Graceful Degradation: Service continues operating even if some sinks are unavailable
- Error Handling: Individual event failures don't stop batch processing
- TCP Keep-Alive: Configured for reliable long-running connections (30-second intervals)
Event Processing Architecture
The event routing system consists of two main components:
- Replication Producer: Captures changes from PostgreSQL using logical replication and queues events
- Event Consumer: Processes queued events in batches and routes them to configured sinks
Events flow through the following stages:
- Database changes trigger logical replication messages
- Changes are converted to
EventData
objects and queued - Consumer processes events in batches and converts to CloudEvents
- CloudEvents are routed to configured sinks based on event routes
- Each sink handles delivery with appropriate retry logic
See Also
- Telemetry - Learn about publishing and routing telemetry events
- Components - Understand component-specific events and telemetry
- Configuration Reference - Complete event sink configuration options
- API Reference - Event routing API documentation
- CloudEvents Specification - Official CloudEvents standard