Working with Telemetry
Learn how to implement telemetry publishing and routing in your digital twins applications
This guide shows you how to implement telemetry functionality in your digital twins applications, from basic telemetry publishing to advanced event routing scenarios.
Prerequisites
- AgeDigitalTwins SDK installed and configured
- Digital twins created with appropriate DTDL models
- Basic understanding of Components (for component telemetry)
- Event routing configured (see Event Routing)
Basic Telemetry Publishing
Simple Telemetry
Start with basic telemetry publishing for individual digital twins:
using AgeDigitalTwins;
// Initialize the client
var client = new AgeDigitalTwinsClient(connectionString);
// Publish basic sensor data
await client.PublishTelemetryAsync("temperature-sensor-001", new {
temperature = 23.5,
humidity = 67.2,
batteryLevel = 89,
timestamp = DateTime.UtcNow
});
Component Telemetry
For complex digital twins with components, publish telemetry for specific components:
// Publish HVAC system telemetry
await client.PublishComponentTelemetryAsync(
twinId: "building-floor-2",
componentName: "hvac-system",
telemetryData: new {
setPointTemperature = 22.0,
actualTemperature = 23.1,
fanSpeed = 75,
energyConsumption = 125.5,
operatingMode = "Auto",
timestamp = DateTime.UtcNow
}
);
With Custom Message IDs
Use custom message IDs for tracking and deduplication:
// Generate a unique message ID
string messageId = $"sensor-001-{DateTime.UtcNow:yyyyMMddHHmmss}";
await client.PublishTelemetryAsync("sensor-001", new {
temperature = 24.1,
timestamp = DateTime.UtcNow
}, messageId: messageId);
Implementing Telemetry Sources
IoT Device Integration
Create a service to handle incoming IoT device data:
public class IoTTelemetryService
{
private readonly AgeDigitalTwinsClient _client;
private readonly ILogger<IoTTelemetryService> _logger;
public IoTTelemetryService(AgeDigitalTwinsClient client, ILogger<IoTTelemetryService> logger)
{
_client = client;
_logger = logger;
}
public async Task ProcessDeviceMessage(string deviceId, IoTDeviceMessage message)
{
try
{
// Map device ID to twin ID (implement your mapping logic)
string twinId = await MapDeviceToTwin(deviceId);
// Publish telemetry data
await _client.PublishTelemetryAsync(twinId, new {
temperature = message.Temperature,
humidity = message.Humidity,
pressure = message.Pressure,
batteryLevel = message.BatteryLevel,
signalStrength = message.SignalStrength,
location = new {
latitude = message.Latitude,
longitude = message.Longitude
},
timestamp = message.Timestamp
});
_logger.LogInformation("Published telemetry for device {DeviceId} -> twin {TwinId}",
deviceId, twinId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish telemetry for device {DeviceId}", deviceId);
}
}
private async Task<string> MapDeviceToTwin(string deviceId)
{
// Implement your device-to-twin mapping logic
// This could query your twins, use a lookup table, etc.
return $"sensor-{deviceId}";
}
}
Building Management System Integration
Integrate with existing building management systems:
public class BmsIntegrationService
{
private readonly AgeDigitalTwinsClient _client;
private readonly IExternalBmsClient _bmsClient;
private readonly Timer _pollingTimer;
public BmsIntegrationService(AgeDigitalTwinsClient client, IExternalBmsClient bmsClient)
{
_client = client;
_bmsClient = bmsClient;
// Poll BMS every 30 seconds
_pollingTimer = new Timer(PollBmsData, null, TimeSpan.Zero, TimeSpan.FromSeconds(30));
}
private async void PollBmsData(object state)
{
try
{
var buildings = await _bmsClient.GetBuildingsAsync();
foreach (var building in buildings)
{
// Publish HVAC telemetry
if (building.HvacData != null)
{
await _client.PublishComponentTelemetryAsync(
building.TwinId,
"hvac",
new {
setPointTemp = building.HvacData.SetPoint,
actualTemp = building.HvacData.ActualTemperature,
fanSpeed = building.HvacData.FanSpeedPercent,
energyUsage = building.HvacData.EnergyConsumption,
mode = building.HvacData.Mode,
timestamp = DateTime.UtcNow
}
);
}
// Publish lighting telemetry
if (building.LightingData != null)
{
await _client.PublishComponentTelemetryAsync(
building.TwinId,
"lighting",
new {
totalBrightness = building.LightingData.AverageBrightness,
energyUsage = building.LightingData.EnergyConsumption,
zonesActive = building.LightingData.ActiveZones,
motionDetected = building.LightingData.OccupancySensors.Any(x => x.Occupied),
timestamp = DateTime.UtcNow
}
);
}
}
}
catch (Exception ex)
{
// Log error but continue polling
Console.WriteLine($"BMS polling error: {ex.Message}");
}
}
}
Telemetry Event Routing
Configure Event Sinks
Set up event routing to send telemetry to appropriate systems:
# appsettings.json or configuration
{
"Events":
{
"Config":
{
"EventSinks":
{
"kafka":
[
{
"name": "TelemetryKafka",
"brokerList": "your-kafka-broker:9092",
"topic": "digitaltwins-telemetry",
"saslMechanism": "PLAIN",
"saslUsername": "your-username",
"saslPassword": "your-password",
},
],
"kusto":
[
{
"name": "TelemetryAnalytics",
"ingestionUri": "https://ingest-your-cluster.region.kusto.windows.net",
"database": "DigitalTwinsAnalytics",
},
],
"mqtt":
[
{
"name": "IoTMqtt",
"broker": "your-mqtt-broker",
"port": 1883,
"topic": "digitaltwins/telemetry",
"clientId": "agedt-telemetry",
},
],
},
"EventRoutes":
[
{ "eventType": "Telemetry", "sink": "TelemetryKafka" },
{
"eventType": "ComponentTelemetry",
"sink": "TelemetryAnalytics",
},
{ "eventType": "Telemetry", "sink": "IoTMqtt" },
],
},
},
}
Multiple Sink Routing
Route the same telemetry to multiple sinks for different purposes:
EventRoutes:
# Send all telemetry to Kafka for real-time processing
- eventType: "Telemetry"
sink: "RealTimeKafka"
- eventType: "ComponentTelemetry"
sink: "RealTimeKafka"
# Send all telemetry to Kusto for analytics
- eventType: "Telemetry"
sink: "AnalyticsKusto"
- eventType: "ComponentTelemetry"
sink: "AnalyticsKusto"
# Send only component telemetry to MQTT for IoT integration
- eventType: "ComponentTelemetry"
sink: "IoTMqtt"
Advanced Scenarios
Batch Telemetry Publishing
For high-volume scenarios, consider batching telemetry data:
public class BatchTelemetryService
{
private readonly AgeDigitalTwinsClient _client;
private readonly ConcurrentQueue<TelemetryItem> _telemetryQueue;
private readonly Timer _batchTimer;
public BatchTelemetryService(AgeDigitalTwinsClient client)
{
_client = client;
_telemetryQueue = new ConcurrentQueue<TelemetryItem>();
// Process batches every 5 seconds
_batchTimer = new Timer(ProcessBatch, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
}
public void QueueTelemetry(string twinId, object telemetryData, string componentName = null)
{
_telemetryQueue.Enqueue(new TelemetryItem
{
TwinId = twinId,
Data = telemetryData,
ComponentName = componentName,
Timestamp = DateTime.UtcNow
});
}
private async void ProcessBatch(object state)
{
var batch = new List<TelemetryItem>();
// Dequeue up to 100 items
while (batch.Count < 100 && _telemetryQueue.TryDequeue(out var item))
{
batch.Add(item);
}
if (batch.Count == 0) return;
// Process batch concurrently
var tasks = batch.Select(async item =>
{
try
{
if (string.IsNullOrEmpty(item.ComponentName))
{
await _client.PublishTelemetryAsync(item.TwinId, item.Data);
}
else
{
await _client.PublishComponentTelemetryAsync(item.TwinId, item.ComponentName, item.Data);
}
}
catch (Exception ex)
{
// Log individual failures but don't stop batch processing
Console.WriteLine($"Failed to publish telemetry for {item.TwinId}: {ex.Message}");
}
});
await Task.WhenAll(tasks);
}
}
public class TelemetryItem
{
public string TwinId { get; set; }
public object Data { get; set; }
public string ComponentName { get; set; }
public DateTime Timestamp { get; set; }
}
Error Handling and Retries
Implement robust error handling for telemetry publishing:
public class ResilientTelemetryService
{
private readonly AgeDigitalTwinsClient _client;
private readonly ILogger<ResilientTelemetryService> _logger;
public async Task PublishTelemetryWithRetry(string twinId, object telemetryData, int maxRetries = 3)
{
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
await _client.PublishTelemetryAsync(twinId, telemetryData);
_logger.LogDebug("Successfully published telemetry for {TwinId} on attempt {Attempt}",
twinId, attempt);
return;
}
catch (Exception ex) when (attempt < maxRetries)
{
_logger.LogWarning(ex, "Failed to publish telemetry for {TwinId} on attempt {Attempt}, retrying...",
twinId, attempt);
// Exponential backoff
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempt)));
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish telemetry for {TwinId} after {MaxRetries} attempts",
twinId, maxRetries);
throw;
}
}
}
}
Performance Best Practices
High-Frequency Telemetry
For high-frequency telemetry scenarios:
-
Use Async/Await Properly:
// DON'T block on async operations client.PublishTelemetryAsync(twinId, data).Wait(); // ❌ // DO use async/await await client.PublishTelemetryAsync(twinId, data); // ✅
-
Consider Fire-and-Forget for Non-Critical Data:
// For high-volume, non-critical telemetry _ = Task.Run(async () => { try { await client.PublishTelemetryAsync(twinId, data); } catch (Exception ex) { _logger.LogWarning(ex, "Telemetry publish failed for {TwinId}", twinId); } });
-
Batch Related Data:
// Instead of multiple calls await client.PublishTelemetryAsync(twinId, new { temperature = 23.5 }); await client.PublishTelemetryAsync(twinId, new { humidity = 67.2 }); // Combine into single call await client.PublishTelemetryAsync(twinId, new { temperature = 23.5, humidity = 67.2, timestamp = DateTime.UtcNow });
Monitoring and Diagnostics
Monitor your telemetry publishing:
public class TelemetryMetrics
{
private long _totalPublished = 0;
private long _totalFailed = 0;
private readonly Timer _metricsTimer;
public TelemetryMetrics()
{
_metricsTimer = new Timer(LogMetrics, null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
}
public void RecordSuccess() => Interlocked.Increment(ref _totalPublished);
public void RecordFailure() => Interlocked.Increment(ref _totalFailed);
private void LogMetrics(object state)
{
var published = Interlocked.Read(ref _totalPublished);
var failed = Interlocked.Read(ref _totalFailed);
Console.WriteLine($"Telemetry metrics - Published: {published}, Failed: {failed}, Success Rate: {(published / (double)(published + failed)):P2}");
}
}
Troubleshooting
Common Issues
-
Telemetry Not Appearing in Sinks
- Check event routing configuration
- Verify sink authentication and connectivity
- Check telemetry event types match route configuration
-
High Latency
- Review batch processing settings
- Check network connectivity to sinks
- Consider using multiple sinks for load distribution
-
Missing Telemetry Data
- Implement proper error handling and logging
- Check PostgreSQL NOTIFY/LISTEN configuration
- Verify digital twin exists before publishing telemetry
Debugging Telemetry Flow
Enable detailed logging to debug telemetry issues:
// In your configuration
services.AddLogging(builder =>
{
builder.AddConsole();
builder.SetMinimumLevel(LogLevel.Debug);
});
// Add specific logging for telemetry
services.Configure<LoggerFilterOptions>(options =>
{
options.AddFilter("AgeDigitalTwins.Events", LogLevel.Debug);
options.AddFilter("AgeDigitalTwins.Telemetry", LogLevel.Debug);
});
See Also
- Telemetry Concepts - Understanding telemetry in AgeDigitalTwins
- Event Routing - Configuring event routing and sinks
- Components - Working with digital twin components
- Troubleshooting Guide - Common issues and solutions