Working with Telemetry

Learn how to implement telemetry publishing and routing in your digital twins applications

This guide shows you how to implement telemetry functionality in your digital twins applications, from basic telemetry publishing to advanced event routing scenarios.

Prerequisites

  • AgeDigitalTwins SDK installed and configured
  • Digital twins created with appropriate DTDL models
  • Basic understanding of Components (for component telemetry)
  • Event routing configured (see Event Routing)

Basic Telemetry Publishing

Simple Telemetry

Start with basic telemetry publishing for individual digital twins:

using AgeDigitalTwins;

// Initialize the client
var client = new AgeDigitalTwinsClient(connectionString);

// Publish basic sensor data
await client.PublishTelemetryAsync("temperature-sensor-001", new {
    temperature = 23.5,
    humidity = 67.2,
    batteryLevel = 89,
    timestamp = DateTime.UtcNow
});

Component Telemetry

For complex digital twins with components, publish telemetry for specific components:

// Publish HVAC system telemetry
await client.PublishComponentTelemetryAsync(
    twinId: "building-floor-2",
    componentName: "hvac-system",
    telemetryData: new {
        setPointTemperature = 22.0,
        actualTemperature = 23.1,
        fanSpeed = 75,
        energyConsumption = 125.5,
        operatingMode = "Auto",
        timestamp = DateTime.UtcNow
    }
);

With Custom Message IDs

Use custom message IDs for tracking and deduplication:

// Generate a unique message ID
string messageId = $"sensor-001-{DateTime.UtcNow:yyyyMMddHHmmss}";

await client.PublishTelemetryAsync("sensor-001", new {
    temperature = 24.1,
    timestamp = DateTime.UtcNow
}, messageId: messageId);

Implementing Telemetry Sources

IoT Device Integration

Create a service to handle incoming IoT device data:

public class IoTTelemetryService
{
    private readonly AgeDigitalTwinsClient _client;
    private readonly ILogger<IoTTelemetryService> _logger;

    public IoTTelemetryService(AgeDigitalTwinsClient client, ILogger<IoTTelemetryService> logger)
    {
        _client = client;
        _logger = logger;
    }

    public async Task ProcessDeviceMessage(string deviceId, IoTDeviceMessage message)
    {
        try
        {
            // Map device ID to twin ID (implement your mapping logic)
            string twinId = await MapDeviceToTwin(deviceId);

            // Publish telemetry data
            await _client.PublishTelemetryAsync(twinId, new {
                temperature = message.Temperature,
                humidity = message.Humidity,
                pressure = message.Pressure,
                batteryLevel = message.BatteryLevel,
                signalStrength = message.SignalStrength,
                location = new {
                    latitude = message.Latitude,
                    longitude = message.Longitude
                },
                timestamp = message.Timestamp
            });

            _logger.LogInformation("Published telemetry for device {DeviceId} -> twin {TwinId}",
                deviceId, twinId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to publish telemetry for device {DeviceId}", deviceId);
        }
    }

    private async Task<string> MapDeviceToTwin(string deviceId)
    {
        // Implement your device-to-twin mapping logic
        // This could query your twins, use a lookup table, etc.
        return $"sensor-{deviceId}";
    }
}

Building Management System Integration

Integrate with existing building management systems:

public class BmsIntegrationService
{
    private readonly AgeDigitalTwinsClient _client;
    private readonly IExternalBmsClient _bmsClient;
    private readonly Timer _pollingTimer;

    public BmsIntegrationService(AgeDigitalTwinsClient client, IExternalBmsClient bmsClient)
    {
        _client = client;
        _bmsClient = bmsClient;

        // Poll BMS every 30 seconds
        _pollingTimer = new Timer(PollBmsData, null, TimeSpan.Zero, TimeSpan.FromSeconds(30));
    }

    private async void PollBmsData(object state)
    {
        try
        {
            var buildings = await _bmsClient.GetBuildingsAsync();

            foreach (var building in buildings)
            {
                // Publish HVAC telemetry
                if (building.HvacData != null)
                {
                    await _client.PublishComponentTelemetryAsync(
                        building.TwinId,
                        "hvac",
                        new {
                            setPointTemp = building.HvacData.SetPoint,
                            actualTemp = building.HvacData.ActualTemperature,
                            fanSpeed = building.HvacData.FanSpeedPercent,
                            energyUsage = building.HvacData.EnergyConsumption,
                            mode = building.HvacData.Mode,
                            timestamp = DateTime.UtcNow
                        }
                    );
                }

                // Publish lighting telemetry
                if (building.LightingData != null)
                {
                    await _client.PublishComponentTelemetryAsync(
                        building.TwinId,
                        "lighting",
                        new {
                            totalBrightness = building.LightingData.AverageBrightness,
                            energyUsage = building.LightingData.EnergyConsumption,
                            zonesActive = building.LightingData.ActiveZones,
                            motionDetected = building.LightingData.OccupancySensors.Any(x => x.Occupied),
                            timestamp = DateTime.UtcNow
                        }
                    );
                }
            }
        }
        catch (Exception ex)
        {
            // Log error but continue polling
            Console.WriteLine($"BMS polling error: {ex.Message}");
        }
    }
}

Telemetry Event Routing

Configure Event Sinks

Set up event routing to send telemetry to appropriate systems:

# appsettings.json or configuration
{
  "Events":
    {
      "Config":
        {
          "EventSinks":
            {
              "kafka":
                [
                  {
                    "name": "TelemetryKafka",
                    "brokerList": "your-kafka-broker:9092",
                    "topic": "digitaltwins-telemetry",
                    "saslMechanism": "PLAIN",
                    "saslUsername": "your-username",
                    "saslPassword": "your-password",
                  },
                ],
              "kusto":
                [
                  {
                    "name": "TelemetryAnalytics",
                    "ingestionUri": "https://ingest-your-cluster.region.kusto.windows.net",
                    "database": "DigitalTwinsAnalytics",
                  },
                ],
              "mqtt":
                [
                  {
                    "name": "IoTMqtt",
                    "broker": "your-mqtt-broker",
                    "port": 1883,
                    "topic": "digitaltwins/telemetry",
                    "clientId": "agedt-telemetry",
                  },
                ],
            },
          "EventRoutes":
            [
              { "eventType": "Telemetry", "sink": "TelemetryKafka" },
              {
                "eventType": "ComponentTelemetry",
                "sink": "TelemetryAnalytics",
              },
              { "eventType": "Telemetry", "sink": "IoTMqtt" },
            ],
        },
    },
}

Multiple Sink Routing

Route the same telemetry to multiple sinks for different purposes:

EventRoutes:
  # Send all telemetry to Kafka for real-time processing
  - eventType: "Telemetry"
    sink: "RealTimeKafka"
  - eventType: "ComponentTelemetry"
    sink: "RealTimeKafka"

  # Send all telemetry to Kusto for analytics
  - eventType: "Telemetry"
    sink: "AnalyticsKusto"
  - eventType: "ComponentTelemetry"
    sink: "AnalyticsKusto"

  # Send only component telemetry to MQTT for IoT integration
  - eventType: "ComponentTelemetry"
    sink: "IoTMqtt"

Advanced Scenarios

Batch Telemetry Publishing

For high-volume scenarios, consider batching telemetry data:

public class BatchTelemetryService
{
    private readonly AgeDigitalTwinsClient _client;
    private readonly ConcurrentQueue<TelemetryItem> _telemetryQueue;
    private readonly Timer _batchTimer;

    public BatchTelemetryService(AgeDigitalTwinsClient client)
    {
        _client = client;
        _telemetryQueue = new ConcurrentQueue<TelemetryItem>();

        // Process batches every 5 seconds
        _batchTimer = new Timer(ProcessBatch, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
    }

    public void QueueTelemetry(string twinId, object telemetryData, string componentName = null)
    {
        _telemetryQueue.Enqueue(new TelemetryItem
        {
            TwinId = twinId,
            Data = telemetryData,
            ComponentName = componentName,
            Timestamp = DateTime.UtcNow
        });
    }

    private async void ProcessBatch(object state)
    {
        var batch = new List<TelemetryItem>();

        // Dequeue up to 100 items
        while (batch.Count < 100 && _telemetryQueue.TryDequeue(out var item))
        {
            batch.Add(item);
        }

        if (batch.Count == 0) return;

        // Process batch concurrently
        var tasks = batch.Select(async item =>
        {
            try
            {
                if (string.IsNullOrEmpty(item.ComponentName))
                {
                    await _client.PublishTelemetryAsync(item.TwinId, item.Data);
                }
                else
                {
                    await _client.PublishComponentTelemetryAsync(item.TwinId, item.ComponentName, item.Data);
                }
            }
            catch (Exception ex)
            {
                // Log individual failures but don't stop batch processing
                Console.WriteLine($"Failed to publish telemetry for {item.TwinId}: {ex.Message}");
            }
        });

        await Task.WhenAll(tasks);
    }
}

public class TelemetryItem
{
    public string TwinId { get; set; }
    public object Data { get; set; }
    public string ComponentName { get; set; }
    public DateTime Timestamp { get; set; }
}

Error Handling and Retries

Implement robust error handling for telemetry publishing:

public class ResilientTelemetryService
{
    private readonly AgeDigitalTwinsClient _client;
    private readonly ILogger<ResilientTelemetryService> _logger;

    public async Task PublishTelemetryWithRetry(string twinId, object telemetryData, int maxRetries = 3)
    {
        for (int attempt = 1; attempt <= maxRetries; attempt++)
        {
            try
            {
                await _client.PublishTelemetryAsync(twinId, telemetryData);
                _logger.LogDebug("Successfully published telemetry for {TwinId} on attempt {Attempt}",
                    twinId, attempt);
                return;
            }
            catch (Exception ex) when (attempt < maxRetries)
            {
                _logger.LogWarning(ex, "Failed to publish telemetry for {TwinId} on attempt {Attempt}, retrying...",
                    twinId, attempt);

                // Exponential backoff
                await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempt)));
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to publish telemetry for {TwinId} after {MaxRetries} attempts",
                    twinId, maxRetries);
                throw;
            }
        }
    }
}

Performance Best Practices

High-Frequency Telemetry

For high-frequency telemetry scenarios:

  1. Use Async/Await Properly:

    // DON'T block on async operations
    client.PublishTelemetryAsync(twinId, data).Wait(); // ❌
    
    // DO use async/await
    await client.PublishTelemetryAsync(twinId, data); // ✅
  2. Consider Fire-and-Forget for Non-Critical Data:

    // For high-volume, non-critical telemetry
    _ = Task.Run(async () => {
        try
        {
            await client.PublishTelemetryAsync(twinId, data);
        }
        catch (Exception ex)
        {
            _logger.LogWarning(ex, "Telemetry publish failed for {TwinId}", twinId);
        }
    });
  3. Batch Related Data:

    // Instead of multiple calls
    await client.PublishTelemetryAsync(twinId, new { temperature = 23.5 });
    await client.PublishTelemetryAsync(twinId, new { humidity = 67.2 });
    
    // Combine into single call
    await client.PublishTelemetryAsync(twinId, new {
        temperature = 23.5,
        humidity = 67.2,
        timestamp = DateTime.UtcNow
    });

Monitoring and Diagnostics

Monitor your telemetry publishing:

public class TelemetryMetrics
{
    private long _totalPublished = 0;
    private long _totalFailed = 0;
    private readonly Timer _metricsTimer;

    public TelemetryMetrics()
    {
        _metricsTimer = new Timer(LogMetrics, null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
    }

    public void RecordSuccess() => Interlocked.Increment(ref _totalPublished);
    public void RecordFailure() => Interlocked.Increment(ref _totalFailed);

    private void LogMetrics(object state)
    {
        var published = Interlocked.Read(ref _totalPublished);
        var failed = Interlocked.Read(ref _totalFailed);

        Console.WriteLine($"Telemetry metrics - Published: {published}, Failed: {failed}, Success Rate: {(published / (double)(published + failed)):P2}");
    }
}

Troubleshooting

Common Issues

  1. Telemetry Not Appearing in Sinks

    • Check event routing configuration
    • Verify sink authentication and connectivity
    • Check telemetry event types match route configuration
  2. High Latency

    • Review batch processing settings
    • Check network connectivity to sinks
    • Consider using multiple sinks for load distribution
  3. Missing Telemetry Data

    • Implement proper error handling and logging
    • Check PostgreSQL NOTIFY/LISTEN configuration
    • Verify digital twin exists before publishing telemetry

Debugging Telemetry Flow

Enable detailed logging to debug telemetry issues:

// In your configuration
services.AddLogging(builder =>
{
    builder.AddConsole();
    builder.SetMinimumLevel(LogLevel.Debug);
});

// Add specific logging for telemetry
services.Configure<LoggerFilterOptions>(options =>
{
    options.AddFilter("AgeDigitalTwins.Events", LogLevel.Debug);
    options.AddFilter("AgeDigitalTwins.Telemetry", LogLevel.Debug);
});

See Also

Cookie Notice

We use cookies to enhance your browsing experience.