Event Routing

AgeDigitalTwins supports real-time event routing to various external systems when digital twins, relationships, models are created, updated, or deleted, and when telemetry data is published. This enables you to build reactive systems, data pipelines, and analytics solutions.

Overview

Event routing in AgeDigitalTwins works similarly to Azure Digital Twins, with the following key features:

  • Real-time streaming: Events are captured and routed in near real-time using PostgreSQL logical replication and NOTIFY/LISTEN
  • CloudEvents format: All events conform to the CloudEvents specification
  • Multiple sinks: Route events to Kafka, Azure Data Explorer (Kusto), MQTT, and more
  • Event filtering: Configure which events are routed to which sinks
  • Dual event types: Lifecycle events (durable) and telemetry events (real-time)

Event Types

AgeDigitalTwins generates several types of events:

Lifecycle Events

Captured through PostgreSQL logical replication (durable and replicated):

  • Twin Events: TwinCreate, TwinUpdate, TwinDelete
  • Relationship Events: RelationshipCreate, RelationshipUpdate, RelationshipDelete
  • Model Events: ModelCreate, ModelUpdate, ModelDelete

Telemetry Events

Published through PostgreSQL NOTIFY/LISTEN (real-time, ephemeral):

  • Twin Telemetry: Telemetry - General telemetry data for a twin
  • Component Telemetry: ComponentTelemetry - Component-specific telemetry data

Event Architecture

The event system uses a dual approach for optimal performance:

This architecture ensures:

  • Lifecycle events are durable and never lost (important for maintaining state consistency)
  • Telemetry events are delivered with minimal latency (important for real-time monitoring)
  • Unified processing through a shared event queue and consumer

Supported Event Sinks

Kafka / Azure Event Hubs

Stream events to Apache Kafka or Azure Event Hubs for real-time processing and integration with downstream systems. Perfect for:

  • Real-time analytics and stream processing
  • Integration with microservices architectures
  • Event-driven workflows
  • High-throughput scenarios

Azure Data Explorer (Kusto)

Send events directly to Azure Data Explorer for analytics and time-series analysis, bypassing the need for intermediate Event Hubs. Ideal for:

  • Historical data analysis and reporting
  • Time-series analytics on telemetry data
  • Data exploration and visualization
  • Long-term data retention

MQTT

Route events to MQTT brokers for lightweight messaging and IoT scenarios. Uses structured CloudEvents format over MQTT. Great for:

  • IoT device integration
  • Lightweight pub/sub messaging
  • Edge computing scenarios
  • Mobile and connected device applications

Event Routing Configuration

Basic Event Routes

Define which events go to which sinks using event routes:

eventRoutes:
  # Route all lifecycle events to Kusto for analytics
  - eventType: TwinCreate
    sink: AdxSink
  - eventType: TwinUpdate
    sink: AdxSink
  - eventType: TwinDelete
    sink: AdxSink
    
  # Route telemetry to Kafka for real-time processing
  - eventType: Telemetry
    sink: KafkaSink
  - eventType: ComponentTelemetry
    sink: KafkaSink
    
  # Route all events to MQTT for IoT integration
  - eventType: "*"
    sink: MqttSink

Advanced Routing Patterns

You can create multiple routes for the same event type to send to different sinks:

eventRoutes:
  # Send telemetry to both real-time and analytical sinks
  - eventType: Telemetry
    sink: KafkaSink      # For real-time processing
  - eventType: Telemetry
    sink: AdxSink        # For historical analysis
    
  # Route component telemetry differently based on use case
  - eventType: ComponentTelemetry
    sink: InfluxDbSink   # Time-series database
  - eventType: ComponentTelemetry
    sink: MqttSink       # IoT device notifications

Configuration

Configure event routing in your application settings. Here's the structure based on your example:

events:
  config:
    eventSinks:
      kafka:
        - name: KafkaSink
          brokerList: your-eventhub-namespace.servicebus.windows.net
          topic: your-eventhub-name
          saslMechanism: OAUTHBEARER
      kusto:
        - name: AdxSink
          ingestionUri: https://ingest-your-cluster.region.kusto.windows.net
          database: your-database
      mqtt:
        - name: MqttSink
          broker: your-mqtt-broker-host
          port: 1883
          topic: digitaltwins/events
          clientId: agedt-client
          username: your-username
          password: your-password

Federated Authentication for Event Sinks (Kusto & Event Hubs)

AgeDigitalTwins uses DefaultAzureCredential for authenticating to Azure services like Kusto (ADX) and Event Hubs (via Kafka). For secure, automated integration, we recommend using Azure AD Workload Identity Federation (OIDC) with a service principal.

1. Create a Service Principal in Your Azure Tenant

You (the customer) should create a service principal (app registration) in your own Azure AD tenant:

az ad app create --display-name "AgeDigitalTwins Event Sink"
az ad sp create --id <appId>

Or via the portal:

  • Go to Azure Active Directory → App registrations → New registration

2. Federate the Service Principal with AgeDigitalTwins

We will provide you with the OIDC issuer URL for our cluster (e.g., https://<your-agedt-cluster>/oidc).

Add a federated credential to your app registration:

  • Go to your app registration in Azure Portal
  • Select Certificates & secretsFederated credentialsAdd credential
  • Set the Issuer to the OIDC issuer URL we provide
  • Set the Subject to the workload identity you want to allow (e.g., system:serviceaccount:<namespace>:<serviceaccount>)
  • Save

3. Assign Roles to the Service Principal

  • For Kusto: Assign the service principal the Contributor or Ingestor role on your Kusto cluster/database.
  • For Event Hubs: Assign the service principal the Azure Event Hubs Data Sender role on the Event Hub namespace.

Example (Kusto):

az kusto database-principal-assignment create \
  --cluster-name <cluster> \
  --database-name <db> \
  --principal-id <servicePrincipalObjectId> \
  --role Ingestor \
  --principal-type App

Example (Event Hubs):

az role assignment create \
  --assignee <servicePrincipalObjectId> \
  --role "Azure Event Hubs Data Sender" \
  --scope /subscriptions/<sub>/resourceGroups/<rg>/providers/Microsoft.EventHub/namespaces/<namespace>

4. Configure AgeDigitalTwins

No secrets or credentials need to be stored in AgeDigitalTwins. The platform will use the federated identity to obtain tokens via DefaultAzureCredential.

Example event sink config:

eventSinks:
  kusto:
    - name: AdxSink
      ingestionUri: https://ingest-your-cluster.region.kusto.windows.net
      database: your-database
  kafka:
    - name: KafkaSink
      brokerList: your-eventhub-namespace.servicebus.windows.net
      topic: your-eventhub-name
      saslMechanism: OAUTHBEARER

Event Routes

You can define event routes to control which events go to which sinks. Example:

eventRoutes:
  - eventType: TwinCreate
    sink: AdxSink
  - eventType: TwinUpdate
    sink: KafkaSink

Migration Note

If you previously used managed identities or connection strings in Azure Digital Twins, you now use federated credentials for secure, passwordless authentication.

CloudEvents Format

All events in AgeDigitalTwins follow the CloudEvents specification, providing a standardized format for event data across different systems.

Event Structure

Every event includes these standard CloudEvents attributes:

  • specversion: CloudEvents specification version (always "1.0")
  • type: Event type (e.g., "Konnektr.DigitalTwins.Twin.Create")
  • source: Source URI of the AgeDigitalTwins instance
  • id: Unique identifier for the event
  • time: Timestamp when the event occurred
  • subject: The entity that the event is about (twin ID, relationship ID, etc.)
  • datacontenttype: Content type of the event data (always "application/json")
  • data: The actual event payload

Event Types

Event TypeDescriptionTrigger
Konnektr.DigitalTwins.Twin.CreateDigital twin createdTwin creation via API
Konnektr.DigitalTwins.Twin.UpdateDigital twin updatedTwin or component updates
Konnektr.DigitalTwins.Twin.DeleteDigital twin deletedTwin deletion via API
Konnektr.DigitalTwins.Relationship.CreateRelationship createdRelationship creation
Konnektr.DigitalTwins.Relationship.UpdateRelationship updatedRelationship property updates
Konnektr.DigitalTwins.Relationship.DeleteRelationship deletedRelationship deletion
Konnektr.DigitalTwins.TelemetryTelemetry publishedTwin telemetry publishing
Konnektr.DigitalTwins.Component.TelemetryComponent telemetryComponent telemetry publishing

For detailed event examples and payload structures, see:

Event Routing Configuration

Kafka Configuration

kafka:
  - name: KafkaSink
    brokerList: your-kafka-broker:9092  # Port 9093 will be auto-appended if not specified
    topic: digitaltwins-events
    saslMechanism: PLAIN  # or OAUTHBEARER for Azure Event Hubs
    saslUsername: your-username  # Required for PLAIN authentication
    saslPassword: your-password  # Required for PLAIN authentication

Notes:

  • For Azure Event Hubs, use OAUTHBEARER with Azure credentials
  • Port 9093 is automatically appended if not specified in brokerList
  • SASL/SSL is automatically configured for secure communication

Kusto Configuration

kusto:
  - name: AdxSink
    ingestionUri: https://ingest-your-cluster.region.kusto.windows.net
    database: your-database
    propertyEventsTable: AdtPropertyEvents  # Default table name
    twinLifeCycleEventsTable: AdtTwinLifeCycleEvents  # Default table name
    relationshipLifeCycleEventsTable: AdtRelationshipLifeCycleEvents  # Default table name

Notes:

  • Uses queued ingestion for optimal performance
  • Automatically creates JSON ingestion mappings for each event type
  • Table names are optional and will use defaults if not specified

MQTT Configuration

mqtt:
  - name: MqttSink
    broker: your-mqtt-broker-host
    port: 1883  # or 8883 for TLS
    topic: digitaltwins/events
    clientId: agedt-client-001
    username: your-username
    password: your-password
    protocolVersion: "5.0.0"  # Supports 3.1.0, 3.1.1, or 5.0.0

Notes:

  • Uses structured CloudEvents format for MQTT messages
  • Supports MQTT v3.1.0, v3.1.1, and v5.0.0
  • Automatic reconnection on connection loss

Performance and Reliability

  • Batching: Events are processed in configurable batches (default 50 events) for optimal performance
  • PostgreSQL Logical Replication: Uses PostgreSQL's built-in logical replication for real-time event capture
  • Replication Slots: Managed replication slots ensure no event loss during restarts or failovers
  • Connection Management: Enhanced timeout settings and automatic reconnection for high-load scenarios
  • Health Monitoring: Built-in health checks monitor replication connection status via IsHealthy property
  • Graceful Degradation: Service continues operating even if some sinks are unavailable
  • Error Handling: Individual event failures don't stop batch processing
  • TCP Keep-Alive: Configured for reliable long-running connections (30-second intervals)

Event Processing Architecture

The event routing system consists of two main components:

  1. Replication Producer: Captures changes from PostgreSQL using logical replication and queues events
  2. Event Consumer: Processes queued events in batches and routes them to configured sinks

Events flow through the following stages:

  1. Database changes trigger logical replication messages
  2. Changes are converted to EventData objects and queued
  3. Consumer processes events in batches and converts to CloudEvents
  4. CloudEvents are routed to configured sinks based on event routes
  5. Each sink handles delivery with appropriate retry logic

See Also

Cookie Notice

We use cookies to enhance your browsing experience.