Konnektr Logo
How-to Guides

Import Jobs

How to use import jobs for bulk data ingestion with SAS tokens and cloud storage.

Import Jobs

Import jobs allow you to bulk ingest data into Konnektr Graph using cloud storage blobs. This guide explains how import jobs work, how to use SAS tokens for Azure Blob Storage, and outlines future support for AWS S3 and Google Cloud Storage.

Overview

An import job is a background process that reads data from an input blob (e.g., NDJSON, CSV) and writes results to an output blob. This is useful for bulk loading twins, relationships, or telemetry data.

Supported Storage Providers

  • Azure Blob Storage (SAS tokens): Supported in hosted and self-hosted deployments.
  • Managed Identity (Azure): Only available in self-hosted deployments.
  • AWS S3: Supported via S3 URIs and default AWS credentials or pre-signed URLs.
  • Google Cloud Storage: Supported via GCS URIs and default GCP credentials or signed URLs.

Using Azure Blob Storage (SAS Tokens)

For hosted deployments, import jobs require Azure Blob Storage URIs with SAS tokens. SAS tokens grant temporary, scoped access to blobs without needing Azure credentials.

Example Blob URI:

https://<account>.blob.core.windows.net/<container>/<blob>?sv=...&sig=...
  • The SAS token must be included in the URI query string.
  • Both input and output blob URIs must be accessible with the provided SAS token.
  • The SAS token must grant read permission for input blobs and write permission for output blobs (use sp=r and/or sp=w in the SAS token).
  • No Azure credentials or managed identity are required for hosted mode.

Using AWS S3

Import jobs support AWS S3 URIs. You can use either the S3 URI format or HTTPS format:

Example S3 URIs:

s3://my-bucket/my-object.ndjson
https://my-bucket.s3.amazonaws.com/my-object.ndjson
  • The application uses default AWS credentials (IAM role, environment variables, or AWS config).
  • You may also use pre-signed URLs for temporary access.
  • The credentials or signed URL must grant read permission for input blobs and write permission for output blobs.
  • For append/resume, the service downloads the existing object, appends new data, and re-uploads.

Using Google Cloud Storage

Import jobs support Google Cloud Storage URIs. You can use either the GCS URI format or HTTPS format:

Example GCS URIs:

gs://my-bucket/my-object.ndjson
https://storage.googleapis.com/my-bucket/my-object.ndjson
  • The application uses default GCP credentials (service account, environment variables, or GCP config).
  • You may also use signed URLs for temporary access.
  • The credentials or signed URL must grant read permission for input blobs and write permission for output blobs.
  • For append/resume, the service downloads the existing object, appends new data, and re-uploads.

How Import Jobs Work

  1. Create an import job via the API, providing:
    • inputBlobUri: The source blob URI (with SAS token, AWS credentials, or GCP credentials/signed URL).
    • outputBlobUri: The destination blob URI (with SAS token, AWS credentials, or GCP credentials/signed URL).
    • Additional job parameters as needed.
  2. The job runs in the background, reading from the input blob and writing results to the output blob.
  3. You can monitor job status, cancel, or resume jobs via the API. For S3 and GCS, resuming a job will append to the existing output object.

Required Permissions

For all providers, you must ensure that:

  • Input blob: The credentials, SAS token, or signed URL must grant read access.
  • Output blob: The credentials, SAS token, or signed URL must grant write access.
  • For Azure SAS tokens, use the sp=r (read) and/or sp=w (write) permissions.
  • For AWS and GCS, ensure your IAM/service account policies allow the required operations.

NDJSON Format Requirements

Import jobs require NDJSON files with a specific structure. Each section must be a line-delimited JSON object, in the following order:

{"Section": "Header"}
{"fileVersion": "1.0.0", "author": "your-name", "organization": "your-org"}
{"Section": "Models"}
<DTDL model JSON, one per line>
{"Section": "Twins"}
<Twin JSON, one per line>
{"Section": "Relationships"}
<Relationship JSON, one per line>

Formatting Guidelines:

  • All objects must be valid JSON and single-line (no embedded newlines).
  • The header section is required and should include metadata.
  • Models, twins, and relationships sections are optional but must follow the order above.
  • See the Migration Guide for more details and code samples.

Example NDJSON Generator (.NET)

ndjson_generator.py
import json

def write_ndjson(models, twins, relationships, file_path, header=None):
   with open(file_path, 'w', encoding='utf-8') as f:
      # Header section
      f.write(json.dumps({"Section": "Header"}) + '\n')
      if header is None:
         header_obj = {"fileVersion": "1.0.0", "author": "authorName", "organization": "organization"}
      else:
         header_obj = header
      f.write(json.dumps(header_obj) + '\n')

      # Models section
      f.write(json.dumps({"Section": "Models"}) + '\n')
      for model in models:
         # model should be a dict or already serialized string
         if isinstance(model, str):
            f.write(model.strip().replace('\n', '') + '\n')
         else:
            f.write(json.dumps(model) + '\n')

      # Twins section
      f.write(json.dumps({"Section": "Twins"}) + '\n')
      for twin in twins:
         f.write(json.dumps(twin) + '\n')

      # Relationships section
      f.write(json.dumps({"Section": "Relationships"}) + '\n')
      for rel in relationships:
         f.write(json.dumps(rel) + '\n')

This function writes the NDJSON file in the required format. Pass lists of models, twins, and relationships as Python dicts or JSON strings. Each object is written on a single line.

Note: Only NDJSON format is supported for import jobs. Do not use JSON-LD or other formats.


Future Support

  • Additional cloud providers and custom storage endpoints may be supported in future releases.

Notes

  • For self-hosted deployments, you may use managed identity for Azure Blob Storage if your environment supports it.
  • All blob URIs must be valid and accessible from the Konnektr Graph instance.
  • See the API Reference for endpoint details and request formats.

For questions or troubleshooting, see How-To Guides or reach out via GitHub Discussions.

On this page