Konnektr Logo
GraphHow to guides

Data History with Azure Data Explorer

Complete guide to archiving digital twin history in Azure Data Explorer (Kusto) or Microsoft Fabric Real-Time Intelligence for temporal queries and analytics.

Overview

This guide shows you how to use Event Routing with Azure Data Explorer to create a complete, queryable history of your digital twin graph.

Once configured, every change to your graph—property updates, twin lifecycle events, relationship changes—is automatically archived to Kusto tables optimized for time-series queries.

What You'll Learn

  • Configure Data History routing to Azure Data Explorer
  • Understand the data schema and tables
  • Run temporal queries to reconstruct historical state
  • Use Kusto's graph capabilities for historical analysis
  • Optimize performance and manage retention
  • Query vector embeddings across time

Prerequisites

Azure Data Explorer:

  • Azure Data Explorer cluster with public network access
  • Database in the cluster
  • Service principal with Database Ingestor role

Microsoft Fabric (Alternative):

  • Fabric workspace with Real-Time Intelligence enabled
  • KQL database created
  • Workspace permissions configured

Konnektr Graph:

  • Standard tier ($99/mo) or higher
  • Access to Konnektr Portal for configuration

Setup

Step 1: Create Service Principal (Microsoft Entra)

Create a service principal for authentication:

# Create service principal
az ad sp create-for-rbac --name "konnektr-graph-history"

# Note the output:
# - appId (clientId)
# - password (clientSecret)
# - tenant (tenantId)

Grant Database Ingestor role:

az kusto database principal-assignment create \
  --cluster-name <cluster-name> \
  --database-name <database-name> \
  --principal-id <service-principal-app-id> \
  --principal-type App \
  --role "Database Ingestor"

Step 2: Configure Event Sink

In ktrlplane:

  1. Navigate to your Graph instance
  2. Go to Settings > Event Sinks
  3. Click Add Sink > Azure Data Explorer
  4. Enter configuration:
    • Name: DataHistorySink
    • Ingestion URI: https://ingest-<cluster>.<region>.kusto.windows.net
    • Database: Your database name
    • Client ID: Service principal app ID
    • Client Secret: Service principal password
    • Tenant ID: Azure AD tenant ID
  5. Click Test Connection to verify
  6. Save

Step 3: Create Event Route

  1. Go to Settings > Event Routes
  2. Click Add Route
  3. Configure:
    • Event Format: DataHistory
    • Destination Sink: DataHistorySink
  4. Save and activate

Step 4: Verify Tables

Konnektr automatically creates three tables in your database. Verify in Kusto query editor:

.show tables

You should see:

  • AdtPropertyEvents
  • AdtTwinLifeCycleEvents
  • AdtRelationshipLifeCycleEvents

Data will start flowing immediately. Check within a few minutes:

AdtPropertyEvents
| take 10

Data Schema

AdtPropertyEvents

Stores twin and relationship property changes over time.

Schema:

ColumnTypeDescription
TimeStampdatetimeWhen property was updated
ServiceIdstringGraph instance URI
IdstringTwin ID
ModelIdstringDTDL model ID (DTMI)
KeystringProperty name
ValuedynamicProperty value (supports JSON)
RelationshipIdstringRelationship ID (empty for twin properties)
RelationshipTargetstringTarget twin ID (empty for twin properties)

Example data:

TimeStampIdModelIdKeyValue
2025-01-20 10:30:00sensor-temp-01dtmi:com:example:Sensor;1temperature72.5
2025-01-20 10:31:00sensor-temp-01dtmi:com:example:Sensor;1temperature73.2

AdtTwinLifeCycleEvents

Stores twin creation and deletion events.

Schema:

ColumnTypeDescription
TwinIdstringTwin ID
ActionstringCreate or Delete
TimeStampdatetimeWhen event occurred
ServiceIdstringGraph instance URI
ModelIdstringDTDL model ID (DTMI)

Example data:

TwinIdActionTimeStampModelId
pump-01Create2025-01-20 09:00:00dtmi:com:example:Pump;1
pump-02Create2025-01-20 09:05:00dtmi:com:example:Pump;1

AdtRelationshipLifeCycleEvents

Stores relationship creation and deletion events.

Schema:

ColumnTypeDescription
RelationshipIdstringRelationship ID
NamestringRelationship name (type)
ActionstringCreate or Delete
TimeStampdatetimeWhen event occurred
ServiceIdstringGraph instance URI
SourcestringSource twin ID
TargetstringTarget twin ID

Example data:

RelationshipIdNameActionSourceTarget
rel-001feedsCreatepump-01tank-01
rel-002monitorsCreatesensor-01pump-01

Temporal Queries

Current State Reconstruction

Get current property values (most recent for each twin):

AdtPropertyEvents
| summarize arg_max(TimeStamp, *) by Id, Key
| project Id, Key, Value, LastUpdate=TimeStamp

Historical State at Specific Time

Reconstruct all property values as they existed at a specific timestamp:

let targetTime = datetime(2025-01-15 14:30:00);
AdtPropertyEvents
| where TimeStamp <= targetTime
| summarize arg_max(TimeStamp, *) by Id, Key
| project Id, Key, Value, TimeStamp

Find Changes in Time Range

Identify which twins changed within a period:

AdtPropertyEvents
| where TimeStamp between (datetime(2025-01-15 08:00) .. datetime(2025-01-15 17:00))
| summarize ChangeCount=count(), Properties=make_set(Key) by Id, ModelId
| order by ChangeCount desc

Property History Over Time

Track how a specific property evolved:

AdtPropertyEvents
| where Id == "sensor-temp-01"
| where Key == "temperature"
| project TimeStamp, Value
| order by TimeStamp asc
| render timechart

Detect Anomalies

Find twins where a property exceeded a threshold:

AdtPropertyEvents
| where Key == "temperature"
| where todouble(Value) > 80
| where TimeStamp > ago(24h)
| project TimeStamp, Id, Temperature=todouble(Value)
| order by TimeStamp desc

Before/After Comparison

Compare state before and after an incident:

let incidentTime = datetime(2025-01-15 10:45:00);
let beforeState = AdtPropertyEvents
    | where TimeStamp < incidentTime
    | summarize arg_max(TimeStamp, *) by Id, Key;
let afterState = AdtPropertyEvents
    | where TimeStamp >= incidentTime and TimeStamp < incidentTime + 5m
    | summarize arg_max(TimeStamp, *) by Id, Key;
beforeState
| join kind=inner afterState on Id, Key
| where Value != Value1
| project Id, Key, BeforValue=Value, AfterValue=Value1, BeforeTime=TimeStamp, AfterTime=TimeStamp1

Graph Reconstruction

Kusto's make-graph operator allows you to reconstruct your digital twin graph structure and run graph queries on historical data.

Create AdtGraph Function

Create a function that reconstructs the current graph state:

.create-or-alter function AdtGraph() {
    let twins = materialize(
        AdtTwinLifeCycleEvents
        | summarize arg_max(TimeStamp, *) by TwinId
        | where Action != 'Delete'
    );
    let twinIds = twins | project TwinId;
    let relationships = AdtRelationshipLifeCycleEvents
        | summarize arg_max(TimeStamp, *) by Source, Target
        | where Source in (twinIds) and Target in (twinIds)
        | where Action != 'Delete';
    relationships
    | make-graph Source-->Target with twins on TwinId
}

Query Current Graph

Find all twins connected to a specific twin:

AdtGraph()
| graph-match (source)-[rel]->(target)
where source.TwinId == "pump-01"
| project Target=target.TwinId, Relationship=rel.Name

Historical Graph Reconstruction

Create a time-aware version that reconstructs the graph at a specific timestamp:

.create-or-alter function AdtGraphAt(targetTime: datetime) {
    let twins = materialize(
        AdtTwinLifeCycleEvents
        | where TimeStamp <= targetTime
        | summarize arg_max(TimeStamp, *) by TwinId
        | where Action != 'Delete'
    );
    let twinIds = twins | project TwinId;
    let relationships = AdtRelationshipLifeCycleEvents
        | where TimeStamp <= targetTime
        | summarize arg_max(TimeStamp, *) by Source, Target
        | where Source in (twinIds) and Target in (twinIds)
        | where Action != 'Delete';
    relationships
    | make-graph Source-->Target with twins on TwinId
}

Query the graph as it existed on January 15 at 10:00 AM:

AdtGraphAt(datetime(2025-01-15 10:00:00))
| graph-match (source)-[rel]->(target)
where source.TwinId == "pump-01"
| project Target=target.TwinId, Relationship=rel.Name

Shortest Path Analysis

Find shortest path between two assets at a historical timestamp:

AdtGraphAt(datetime(2025-01-15 10:00:00))
| graph-match (a)-[edge*1..5]->(b)
where a.TwinId == "asset-a" and b.TwinId == "asset-b"
| extend PathLength = array_length(edge)
| summarize arg_min(PathLength, *) by a, b
| project Path=edge, PathLength

Impact Analysis

Find all downstream assets affected by a failure at a historical time:

let failureTime = datetime(2025-01-15 12:30:00);
AdtGraphAt(failureTime)
| graph-match (failed)-[feed*1..]->(affected)
where failed.TwinId == "pump-01" and feed.Name == "feeds"
| project AffectedAsset=affected.TwinId
| distinct AffectedAsset

Connected Components

Identify isolated subgraphs at a specific time:

AdtGraphAt(datetime(2025-01-15 10:00:00))
| graph-match cycles=any (n1)-[*]-(n2)
where n1.TwinId == n2.TwinId
| project Component=n1.TwinId, Members=array_sort_asc(make_set(n2.TwinId))
| distinct *

If you're storing vector embeddings as twin properties (using PostgreSQL's pgvector), those embeddings are also historized. Kusto supports vector operations for similarity analysis.

Find Similar Historical States

// Get current embedding for a twin
let currentEmbedding = AdtPropertyEvents
    | where Id == "pump-01"
    | where Key == "maintenanceEmbedding"
    | summarize arg_max(TimeStamp, *) 
    | project Embedding=todynamic(Value);

// Find historical states with similar embeddings
AdtPropertyEvents
| where Key == "maintenanceEmbedding"
| where TimeStamp < ago(30d)
| extend EmbeddingArray = todynamic(Value)
| extend Similarity = series_cosine_similarity(
    EmbeddingArray, 
    toscalar(currentEmbedding))
| top 10 by Similarity desc
| project TimeStamp, Id, Similarity

Cluster Failure Modes by Embedding

Group historical failures into clusters based on embedding similarity:

AdtPropertyEvents
| where Key == "failureStateEmbedding"
| where TimeStamp > ago(180d)
| extend Embedding = todynamic(Value)
// Use K-means-like clustering or similarity grouping
| summarize Occurrences=count(), 
            Examples=take_any(Id, 3), 
            AvgTimestamp=avg(TimeStamp) 
    by hash(Embedding, 1000) % 5  // Simple bucketing by embedding hash
| order by Occurrences desc

Temporal Embedding Drift

Track how an asset's embedding has changed over time:

AdtPropertyEvents
| where Id == "pump-01"
| where Key == "stateEmbedding"
| order by TimeStamp asc
| extend PrevEmbedding = prev(todynamic(Value), 1)
| extend CurrentEmbedding = todynamic(Value)
| where isnotnull(PrevEmbedding)
| extend Similarity = series_cosine_similarity(CurrentEmbedding, PrevEmbedding)
| project TimeStamp, Similarity
| render timechart

Performance Optimization

Ingestion Performance

Streaming Ingestion (Low latency, <4 GB/hour):

.alter table AdtPropertyEvents policy streamingingestion enable
.alter table AdtTwinLifeCycleEvents policy streamingingestion enable
.alter table AdtRelationshipLifeCycleEvents policy streamingingestion enable
  • Latency: <10 seconds
  • Best for: Real-time dashboards, immediate queries

Batch Ingestion (High throughput, >4 GB/hour):

.alter table AdtPropertyEvents policy ingestionbatching 
@'{"MaximumBatchingTimeSpan":"00:00:30", "MaximumNumberOfItems": 10000, "MaximumRawDataSizeMB": 1024}'
  • Latency: 30 seconds - 15 minutes (configurable)
  • Best for: High-volume archival

Query Performance

Partition by Time:

.alter table AdtPropertyEvents policy partitioning 
@'{"PartitionKeys":[{"ColumnName":"TimeStamp","Kind":"UniformRange","Properties":{"RangeSize":"1.00:00:00"}}]}'

Partitions data by day—dramatically speeds up time-range queries.

Materialized View for Current State:

.create materialized-view LatestPropertyValues on table AdtPropertyEvents
{
    AdtPropertyEvents
    | summarize arg_max(TimeStamp, *) by Id, Key
}

Pre-aggregates latest values—instant "current state" queries without full table scan.

Update Policy for Real-Time Aggregations:

// Create target table for aggregated metrics
.create table PropertyMetrics (
    Id: string,
    Key: string,
    Hour: datetime,
    AvgValue: real,
    MinValue: real,
    MaxValue: real
)

// Create update policy to aggregate on ingestion
.alter table AdtPropertyEvents policy update 
@'[{"Source": "AdtPropertyEvents", "Query": "AdtPropertyEvents | where Key in (\"temperature\", \"pressure\") | summarize AvgValue=avg(todouble(Value)), MinValue=min(todouble(Value)), MaxValue=max(todouble(Value)) by Id, Key, Hour=bin(TimeStamp, 1h)", "IsEnabled": true, "IsTransactional": false}]'

Data Retention

Configure retention policies based on compliance and storage requirements:

Set Retention Period:

.alter-merge table AdtPropertyEvents policy retention 
@'{"SoftDeletePeriod":"365.00:00:00", "Recoverability":"Enabled"}'

Retains data for 365 days, then soft-deletes (recoverable for additional period).

Hot/Cold Cache:

.alter table AdtPropertyEvents policy caching hot = 30d

Keeps last 30 days in hot cache (fast queries), older data in cold storage (slower but cheaper).

Export Old Data to Data Lake:

.export async compressed to parquet (
    h@'https://storage.blob.core.windows.net/archive?<SAS>'
) with (namePrefix="history", includeHeaders="all") <|
    AdtPropertyEvents
    | where TimeStamp < ago(365d)

Common Use Cases

Compliance Audit Trail

Requirement: Prove system state during inspection

let inspectionTime = datetime(2025-01-15 14:00:00);
AdtPropertyEvents
| where TimeStamp <= inspectionTime
| summarize arg_max(TimeStamp, *) by Id, Key
| where Id in ("critical-asset-01", "critical-asset-02")
| project Id, Key, Value, LastUpdate=TimeStamp
| order by Id, Key

Export to CSV/PDF for regulatory submission.

Root Cause Analysis

Incident: Pressure spike at 10:45 AM

let incidentTime = datetime(2025-01-15 10:45:00);

// What changed in 5 minutes before incident?
AdtPropertyEvents
| where TimeStamp between ((incidentTime - 5m) .. incidentTime)
| where Key in ("pressure", "flowRate", "valvePosition")
| project TimeStamp, Id, Key, Value
| order by TimeStamp asc

Predictive Maintenance Pattern Matching

Find assets with degradation patterns similar to failed assets

// Define degradation pattern from known failure
let failedPattern = AdtPropertyEvents
    | where Id == "pump-failed-01"
    | where Key == "vibration"
    | where TimeStamp between (ago(30d) .. ago(1d))
    | summarize Pattern=make_list(todouble(Value)) by bin(TimeStamp, 1h)
    | summarize FailedPattern=make_list(Pattern);

// Find current assets with similar patterns
AdtPropertyEvents
| where Key == "vibration"
| where TimeStamp > ago(30d)
| summarize CurrentPattern=make_list(todouble(Value)) by Id, bin(TimeStamp, 1h)
| summarize PatternList=make_list(CurrentPattern) by Id
| extend Similarity = series_cosine_similarity(PatternList, toscalar(failedPattern))
| where Similarity > 0.85
| project Id, Similarity
| order by Similarity desc

AI Agent Historical Context

Agent query: "Has this anomaly type occurred before?"

// Current anomaly: Temperature spike to 95°C
let anomalyThreshold = 95.0;
AdtPropertyEvents
| where Key == "temperature"
| where todouble(Value) >= anomalyThreshold
| where TimeStamp < ago(1h)  // Historical only
| summarize 
    OccurrenceCount=count(),
    LastOccurrence=max(TimeStamp),
    AffectedTwins=dcount(Id),
    ExampleTwins=take_any(Id, 3)
| project OccurrenceCount, LastOccurrence, AffectedTwins, ExampleTwins

Agent uses this to inform decision: "Yes, occurred 3 times before, last 2 weeks ago."

Troubleshooting

No Data Appearing

Check ingestion status:

.show ingestion failures
| where Table in ("AdtPropertyEvents", "AdtTwinLifeCycleEvents", "AdtRelationshipLifeCycleEvents")
| top 10 by FailedOn desc

Verify event route:

  • Confirm route is active in ktrlplane
  • Check eventFormat is set to DataHistory
  • Verify sink name matches

Test service principal permissions:

az kusto database principal-assignment list \
  --cluster-name <cluster> \
  --database-name <database>

High Ingestion Latency

Switch to streaming ingestion:

.alter table AdtPropertyEvents policy streamingingestion enable

Reduce batching interval:

.alter table AdtPropertyEvents policy ingestionbatching 
@'{"MaximumBatchingTimeSpan":"00:00:10"}'

Wait 5-10 minutes for policy to take effect.

Slow Queries

Check if partitioning is enabled:

.show table AdtPropertyEvents policy partitioning

Review recent slow queries:

.show queries
| where Database == "<your-database>"
| where Duration > 10s
| project StartedOn, Text, Duration
| order by StartedOn desc

Use materialized views for frequent queries: See Query Performance section.

Next Steps

On this page