Data History with Azure Data Explorer
Complete guide to archiving digital twin history in Azure Data Explorer (Kusto) or Microsoft Fabric Real-Time Intelligence for temporal queries and analytics.
Overview
This guide shows you how to use Event Routing with Azure Data Explorer to create a complete, queryable history of your digital twin graph.
Once configured, every change to your graph—property updates, twin lifecycle events, relationship changes—is automatically archived to Kusto tables optimized for time-series queries.
What You'll Learn
- Configure Data History routing to Azure Data Explorer
- Understand the data schema and tables
- Run temporal queries to reconstruct historical state
- Use Kusto's graph capabilities for historical analysis
- Optimize performance and manage retention
- Query vector embeddings across time
Prerequisites
Azure Data Explorer:
- Azure Data Explorer cluster with public network access
- Database in the cluster
- Service principal with Database Ingestor role
Microsoft Fabric (Alternative):
- Fabric workspace with Real-Time Intelligence enabled
- KQL database created
- Workspace permissions configured
Konnektr Graph:
- Standard tier ($99/mo) or higher
- Access to Konnektr Portal for configuration
Setup
Step 1: Create Service Principal (Microsoft Entra)
Create a service principal for authentication:
# Create service principal
az ad sp create-for-rbac --name "konnektr-graph-history"
# Note the output:
# - appId (clientId)
# - password (clientSecret)
# - tenant (tenantId)Grant Database Ingestor role:
az kusto database principal-assignment create \
--cluster-name <cluster-name> \
--database-name <database-name> \
--principal-id <service-principal-app-id> \
--principal-type App \
--role "Database Ingestor"Step 2: Configure Event Sink
In ktrlplane:
- Navigate to your Graph instance
- Go to Settings > Event Sinks
- Click Add Sink > Azure Data Explorer
- Enter configuration:
- Name:
DataHistorySink - Ingestion URI:
https://ingest-<cluster>.<region>.kusto.windows.net - Database: Your database name
- Client ID: Service principal app ID
- Client Secret: Service principal password
- Tenant ID: Azure AD tenant ID
- Name:
- Click Test Connection to verify
- Save
Step 3: Create Event Route
- Go to Settings > Event Routes
- Click Add Route
- Configure:
- Event Format:
DataHistory - Destination Sink:
DataHistorySink
- Event Format:
- Save and activate
Step 4: Verify Tables
Konnektr automatically creates three tables in your database. Verify in Kusto query editor:
.show tablesYou should see:
AdtPropertyEventsAdtTwinLifeCycleEventsAdtRelationshipLifeCycleEvents
Data will start flowing immediately. Check within a few minutes:
AdtPropertyEvents
| take 10Data Schema
AdtPropertyEvents
Stores twin and relationship property changes over time.
Schema:
| Column | Type | Description |
|---|---|---|
TimeStamp | datetime | When property was updated |
ServiceId | string | Graph instance URI |
Id | string | Twin ID |
ModelId | string | DTDL model ID (DTMI) |
Key | string | Property name |
Value | dynamic | Property value (supports JSON) |
RelationshipId | string | Relationship ID (empty for twin properties) |
RelationshipTarget | string | Target twin ID (empty for twin properties) |
Example data:
| TimeStamp | Id | ModelId | Key | Value |
|---|---|---|---|---|
| 2025-01-20 10:30:00 | sensor-temp-01 | dtmi:com:example:Sensor;1 | temperature | 72.5 |
| 2025-01-20 10:31:00 | sensor-temp-01 | dtmi:com:example:Sensor;1 | temperature | 73.2 |
AdtTwinLifeCycleEvents
Stores twin creation and deletion events.
Schema:
| Column | Type | Description |
|---|---|---|
TwinId | string | Twin ID |
Action | string | Create or Delete |
TimeStamp | datetime | When event occurred |
ServiceId | string | Graph instance URI |
ModelId | string | DTDL model ID (DTMI) |
Example data:
| TwinId | Action | TimeStamp | ModelId |
|---|---|---|---|
| pump-01 | Create | 2025-01-20 09:00:00 | dtmi:com:example:Pump;1 |
| pump-02 | Create | 2025-01-20 09:05:00 | dtmi:com:example:Pump;1 |
AdtRelationshipLifeCycleEvents
Stores relationship creation and deletion events.
Schema:
| Column | Type | Description |
|---|---|---|
RelationshipId | string | Relationship ID |
Name | string | Relationship name (type) |
Action | string | Create or Delete |
TimeStamp | datetime | When event occurred |
ServiceId | string | Graph instance URI |
Source | string | Source twin ID |
Target | string | Target twin ID |
Example data:
| RelationshipId | Name | Action | Source | Target |
|---|---|---|---|---|
| rel-001 | feeds | Create | pump-01 | tank-01 |
| rel-002 | monitors | Create | sensor-01 | pump-01 |
Temporal Queries
Current State Reconstruction
Get current property values (most recent for each twin):
AdtPropertyEvents
| summarize arg_max(TimeStamp, *) by Id, Key
| project Id, Key, Value, LastUpdate=TimeStampHistorical State at Specific Time
Reconstruct all property values as they existed at a specific timestamp:
let targetTime = datetime(2025-01-15 14:30:00);
AdtPropertyEvents
| where TimeStamp <= targetTime
| summarize arg_max(TimeStamp, *) by Id, Key
| project Id, Key, Value, TimeStampFind Changes in Time Range
Identify which twins changed within a period:
AdtPropertyEvents
| where TimeStamp between (datetime(2025-01-15 08:00) .. datetime(2025-01-15 17:00))
| summarize ChangeCount=count(), Properties=make_set(Key) by Id, ModelId
| order by ChangeCount descProperty History Over Time
Track how a specific property evolved:
AdtPropertyEvents
| where Id == "sensor-temp-01"
| where Key == "temperature"
| project TimeStamp, Value
| order by TimeStamp asc
| render timechartDetect Anomalies
Find twins where a property exceeded a threshold:
AdtPropertyEvents
| where Key == "temperature"
| where todouble(Value) > 80
| where TimeStamp > ago(24h)
| project TimeStamp, Id, Temperature=todouble(Value)
| order by TimeStamp descBefore/After Comparison
Compare state before and after an incident:
let incidentTime = datetime(2025-01-15 10:45:00);
let beforeState = AdtPropertyEvents
| where TimeStamp < incidentTime
| summarize arg_max(TimeStamp, *) by Id, Key;
let afterState = AdtPropertyEvents
| where TimeStamp >= incidentTime and TimeStamp < incidentTime + 5m
| summarize arg_max(TimeStamp, *) by Id, Key;
beforeState
| join kind=inner afterState on Id, Key
| where Value != Value1
| project Id, Key, BeforValue=Value, AfterValue=Value1, BeforeTime=TimeStamp, AfterTime=TimeStamp1Graph Reconstruction
Kusto's make-graph operator allows you to reconstruct your digital twin graph structure and run graph queries on historical data.
Create AdtGraph Function
Create a function that reconstructs the current graph state:
.create-or-alter function AdtGraph() {
let twins = materialize(
AdtTwinLifeCycleEvents
| summarize arg_max(TimeStamp, *) by TwinId
| where Action != 'Delete'
);
let twinIds = twins | project TwinId;
let relationships = AdtRelationshipLifeCycleEvents
| summarize arg_max(TimeStamp, *) by Source, Target
| where Source in (twinIds) and Target in (twinIds)
| where Action != 'Delete';
relationships
| make-graph Source-->Target with twins on TwinId
}Query Current Graph
Find all twins connected to a specific twin:
AdtGraph()
| graph-match (source)-[rel]->(target)
where source.TwinId == "pump-01"
| project Target=target.TwinId, Relationship=rel.NameHistorical Graph Reconstruction
Create a time-aware version that reconstructs the graph at a specific timestamp:
.create-or-alter function AdtGraphAt(targetTime: datetime) {
let twins = materialize(
AdtTwinLifeCycleEvents
| where TimeStamp <= targetTime
| summarize arg_max(TimeStamp, *) by TwinId
| where Action != 'Delete'
);
let twinIds = twins | project TwinId;
let relationships = AdtRelationshipLifeCycleEvents
| where TimeStamp <= targetTime
| summarize arg_max(TimeStamp, *) by Source, Target
| where Source in (twinIds) and Target in (twinIds)
| where Action != 'Delete';
relationships
| make-graph Source-->Target with twins on TwinId
}Query the graph as it existed on January 15 at 10:00 AM:
AdtGraphAt(datetime(2025-01-15 10:00:00))
| graph-match (source)-[rel]->(target)
where source.TwinId == "pump-01"
| project Target=target.TwinId, Relationship=rel.NameShortest Path Analysis
Find shortest path between two assets at a historical timestamp:
AdtGraphAt(datetime(2025-01-15 10:00:00))
| graph-match (a)-[edge*1..5]->(b)
where a.TwinId == "asset-a" and b.TwinId == "asset-b"
| extend PathLength = array_length(edge)
| summarize arg_min(PathLength, *) by a, b
| project Path=edge, PathLengthImpact Analysis
Find all downstream assets affected by a failure at a historical time:
let failureTime = datetime(2025-01-15 12:30:00);
AdtGraphAt(failureTime)
| graph-match (failed)-[feed*1..]->(affected)
where failed.TwinId == "pump-01" and feed.Name == "feeds"
| project AffectedAsset=affected.TwinId
| distinct AffectedAssetConnected Components
Identify isolated subgraphs at a specific time:
AdtGraphAt(datetime(2025-01-15 10:00:00))
| graph-match cycles=any (n1)-[*]-(n2)
where n1.TwinId == n2.TwinId
| project Component=n1.TwinId, Members=array_sort_asc(make_set(n2.TwinId))
| distinct *Vector Embeddings and Similarity Search
If you're storing vector embeddings as twin properties (using PostgreSQL's pgvector), those embeddings are also historized. Kusto supports vector operations for similarity analysis.
Find Similar Historical States
// Get current embedding for a twin
let currentEmbedding = AdtPropertyEvents
| where Id == "pump-01"
| where Key == "maintenanceEmbedding"
| summarize arg_max(TimeStamp, *)
| project Embedding=todynamic(Value);
// Find historical states with similar embeddings
AdtPropertyEvents
| where Key == "maintenanceEmbedding"
| where TimeStamp < ago(30d)
| extend EmbeddingArray = todynamic(Value)
| extend Similarity = series_cosine_similarity(
EmbeddingArray,
toscalar(currentEmbedding))
| top 10 by Similarity desc
| project TimeStamp, Id, SimilarityCluster Failure Modes by Embedding
Group historical failures into clusters based on embedding similarity:
AdtPropertyEvents
| where Key == "failureStateEmbedding"
| where TimeStamp > ago(180d)
| extend Embedding = todynamic(Value)
// Use K-means-like clustering or similarity grouping
| summarize Occurrences=count(),
Examples=take_any(Id, 3),
AvgTimestamp=avg(TimeStamp)
by hash(Embedding, 1000) % 5 // Simple bucketing by embedding hash
| order by Occurrences descTemporal Embedding Drift
Track how an asset's embedding has changed over time:
AdtPropertyEvents
| where Id == "pump-01"
| where Key == "stateEmbedding"
| order by TimeStamp asc
| extend PrevEmbedding = prev(todynamic(Value), 1)
| extend CurrentEmbedding = todynamic(Value)
| where isnotnull(PrevEmbedding)
| extend Similarity = series_cosine_similarity(CurrentEmbedding, PrevEmbedding)
| project TimeStamp, Similarity
| render timechartPerformance Optimization
Ingestion Performance
Streaming Ingestion (Low latency, <4 GB/hour):
.alter table AdtPropertyEvents policy streamingingestion enable
.alter table AdtTwinLifeCycleEvents policy streamingingestion enable
.alter table AdtRelationshipLifeCycleEvents policy streamingingestion enable- Latency: <10 seconds
- Best for: Real-time dashboards, immediate queries
Batch Ingestion (High throughput, >4 GB/hour):
.alter table AdtPropertyEvents policy ingestionbatching
@'{"MaximumBatchingTimeSpan":"00:00:30", "MaximumNumberOfItems": 10000, "MaximumRawDataSizeMB": 1024}'- Latency: 30 seconds - 15 minutes (configurable)
- Best for: High-volume archival
Query Performance
Partition by Time:
.alter table AdtPropertyEvents policy partitioning
@'{"PartitionKeys":[{"ColumnName":"TimeStamp","Kind":"UniformRange","Properties":{"RangeSize":"1.00:00:00"}}]}'Partitions data by day—dramatically speeds up time-range queries.
Materialized View for Current State:
.create materialized-view LatestPropertyValues on table AdtPropertyEvents
{
AdtPropertyEvents
| summarize arg_max(TimeStamp, *) by Id, Key
}Pre-aggregates latest values—instant "current state" queries without full table scan.
Update Policy for Real-Time Aggregations:
// Create target table for aggregated metrics
.create table PropertyMetrics (
Id: string,
Key: string,
Hour: datetime,
AvgValue: real,
MinValue: real,
MaxValue: real
)
// Create update policy to aggregate on ingestion
.alter table AdtPropertyEvents policy update
@'[{"Source": "AdtPropertyEvents", "Query": "AdtPropertyEvents | where Key in (\"temperature\", \"pressure\") | summarize AvgValue=avg(todouble(Value)), MinValue=min(todouble(Value)), MaxValue=max(todouble(Value)) by Id, Key, Hour=bin(TimeStamp, 1h)", "IsEnabled": true, "IsTransactional": false}]'Data Retention
Configure retention policies based on compliance and storage requirements:
Set Retention Period:
.alter-merge table AdtPropertyEvents policy retention
@'{"SoftDeletePeriod":"365.00:00:00", "Recoverability":"Enabled"}'Retains data for 365 days, then soft-deletes (recoverable for additional period).
Hot/Cold Cache:
.alter table AdtPropertyEvents policy caching hot = 30dKeeps last 30 days in hot cache (fast queries), older data in cold storage (slower but cheaper).
Export Old Data to Data Lake:
.export async compressed to parquet (
h@'https://storage.blob.core.windows.net/archive?<SAS>'
) with (namePrefix="history", includeHeaders="all") <|
AdtPropertyEvents
| where TimeStamp < ago(365d)Common Use Cases
Compliance Audit Trail
Requirement: Prove system state during inspection
let inspectionTime = datetime(2025-01-15 14:00:00);
AdtPropertyEvents
| where TimeStamp <= inspectionTime
| summarize arg_max(TimeStamp, *) by Id, Key
| where Id in ("critical-asset-01", "critical-asset-02")
| project Id, Key, Value, LastUpdate=TimeStamp
| order by Id, KeyExport to CSV/PDF for regulatory submission.
Root Cause Analysis
Incident: Pressure spike at 10:45 AM
let incidentTime = datetime(2025-01-15 10:45:00);
// What changed in 5 minutes before incident?
AdtPropertyEvents
| where TimeStamp between ((incidentTime - 5m) .. incidentTime)
| where Key in ("pressure", "flowRate", "valvePosition")
| project TimeStamp, Id, Key, Value
| order by TimeStamp ascPredictive Maintenance Pattern Matching
Find assets with degradation patterns similar to failed assets
// Define degradation pattern from known failure
let failedPattern = AdtPropertyEvents
| where Id == "pump-failed-01"
| where Key == "vibration"
| where TimeStamp between (ago(30d) .. ago(1d))
| summarize Pattern=make_list(todouble(Value)) by bin(TimeStamp, 1h)
| summarize FailedPattern=make_list(Pattern);
// Find current assets with similar patterns
AdtPropertyEvents
| where Key == "vibration"
| where TimeStamp > ago(30d)
| summarize CurrentPattern=make_list(todouble(Value)) by Id, bin(TimeStamp, 1h)
| summarize PatternList=make_list(CurrentPattern) by Id
| extend Similarity = series_cosine_similarity(PatternList, toscalar(failedPattern))
| where Similarity > 0.85
| project Id, Similarity
| order by Similarity descAI Agent Historical Context
Agent query: "Has this anomaly type occurred before?"
// Current anomaly: Temperature spike to 95°C
let anomalyThreshold = 95.0;
AdtPropertyEvents
| where Key == "temperature"
| where todouble(Value) >= anomalyThreshold
| where TimeStamp < ago(1h) // Historical only
| summarize
OccurrenceCount=count(),
LastOccurrence=max(TimeStamp),
AffectedTwins=dcount(Id),
ExampleTwins=take_any(Id, 3)
| project OccurrenceCount, LastOccurrence, AffectedTwins, ExampleTwinsAgent uses this to inform decision: "Yes, occurred 3 times before, last 2 weeks ago."
Troubleshooting
No Data Appearing
Check ingestion status:
.show ingestion failures
| where Table in ("AdtPropertyEvents", "AdtTwinLifeCycleEvents", "AdtRelationshipLifeCycleEvents")
| top 10 by FailedOn descVerify event route:
- Confirm route is active in ktrlplane
- Check
eventFormatis set toDataHistory - Verify sink name matches
Test service principal permissions:
az kusto database principal-assignment list \
--cluster-name <cluster> \
--database-name <database>High Ingestion Latency
Switch to streaming ingestion:
.alter table AdtPropertyEvents policy streamingingestion enableReduce batching interval:
.alter table AdtPropertyEvents policy ingestionbatching
@'{"MaximumBatchingTimeSpan":"00:00:10"}'Wait 5-10 minutes for policy to take effect.
Slow Queries
Check if partitioning is enabled:
.show table AdtPropertyEvents policy partitioningReview recent slow queries:
.show queries
| where Database == "<your-database>"
| where Duration > 10s
| project StartedOn, Text, Duration
| order by StartedOn descUse materialized views for frequent queries: See Query Performance section.
Next Steps
- Event Routing - Configure other event destinations
- Kusto Query Language Tutorial - Learn more KQL
- Graph Operators in Kusto - Advanced graph queries
- Azure Data Explorer Best Practices - Optimization tips