Skip to main content
Version: 1.3.0

Azure Blob Storage

Synopsis

Azure Blob Storage device reads and processes files from Azure storage containers. This pull-type device connects to Azure Blob Storage containers to retrieve files in various formats (JSON, JSONL, Parquet) and processes them through DataStream pipelines. The device supports both connection string and service principal authentication methods.

Schema

- id: <numeric>
name: <string>
description: <string>
type: azblob
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
connection_string: <string>
container_name: <string>
tenant_id: <string>
client_id: <string>
client_secret: <string>
account: <string>
path_prefix: <string>
file_format: <string>
batch_size: <number>
poll_interval: <number>
delete_after_processing: <boolean>
max_concurrent_files: <number>

Configuration

FieldTypeRequiredDefaultDescription
idnumericY-Unique numeric identifier
namestringY-Device name
descriptionstringN-Optional description of the device's purpose
typestringY-Device type identifier (must be azblob)
tagsstring[]N-Array of labels for categorization
pipelinespipeline[]N-Array of preprocessing pipeline references
statusbooleanNtrueBoolean flag to enable/disable the device
connection_stringstringY*-Azure storage account connection string for authentication
container_namestringY-Name of the Azure Blob Storage container to read from
tenant_idstringY*-Azure tenant ID for service principal authentication
client_idstringY*-Azure client ID for service principal authentication
client_secretstringY*-Azure client secret for service principal authentication
accountstringY*-Azure storage account name for service principal authentication
path_prefixstringN""Path prefix filter to limit which files are processed
file_formatstringNjsonFile format to expect: json, jsonl, or parquet
batch_sizenumberN1000Number of records to process in each batch
poll_intervalnumberN60Interval in seconds between container polling cycles
delete_after_processingbooleanNfalseWhether to delete files after successful processing
max_concurrent_filesnumberN5Maximum number of files to process concurrently

* = Conditionally required (see authentication methods below)

Authentication Methods

Choose either connection string OR service principal authentication:

  • Connection String: Requires connection_string and container_name
  • Service Principal: Requires tenant_id, client_id, client_secret, account, and container_name
Secrets management

Avoid hardcoding connection_string and client_secret in plain text. Prefer referencing encrypted secrets (e.g., environment variables, vault integrations, or secret files) supported by DataStream. Rotate credentials regularly and restrict scope/permissions to least privilege.

Details

The Azure Blob Storage device operates as a pull-type data source that periodically scans Azure storage containers for new files. The device supports multiple file formats and provides flexible authentication options for enterprise environments.

File Format Processing: The device automatically detects and processes files based on the configured format. JSON files are parsed as individual objects, JSONL files process each line as a separate record, and Parquet files are read using columnar processing for efficient large-data handling.

Polling Behavior: The device maintains state to track processed files and only processes new or modified files during each polling cycle. The polling interval can be adjusted based on data arrival patterns and processing requirements.

Concurrent Processing: Multiple files can be processed simultaneously to improve throughput. The concurrency level is configurable and should be tuned based on available system resources and storage account limits.

Error Handling: Files that fail processing are marked and can be retried on subsequent polling cycles. The device provides detailed logging for troubleshooting connection and processing issues.

Examples

Basic Connection String Authentication

Configuring Azure Blob Storage device with connection string authentication to process JSON files...

- id: 1
name: blob-json-processor
type: azblob
properties:
connection_string: "DefaultEndpointsProtocol=https;AccountName=myaccount;AccountKey=key123;EndpointSuffix=core.windows.net"
container_name: "logs"
file_format: "json"
poll_interval: 300

Device polls the 'logs' container every 5 minutes for JSON files and processes each file as individual records...

{
"timestamp": "2024-01-15T10:30:00Z",
"level": "INFO",
"message": "Application started",
"source_file": "app-logs-2024-01-15.json",
"container": "logs"
}

Service Principal Authentication

Using service principal authentication for enterprise security compliance...

- id: 2
name: enterprise-blob-reader
type: azblob
properties:
tenant_id: "12345678-1234-1234-1234-123456789abc"
client_id: "87654321-4321-4321-4321-cba987654321"
client_secret: "your-client-secret"
account: "enterprisestorage"
container_name: "security-logs"
file_format: "jsonl"
path_prefix: "prod/"

Service principal provides enterprise-grade authentication with path filtering for production logs only...

{
"event_type": "authentication",
"user_id": "user123",
"timestamp": "2024-01-15T10:30:00Z",
"source_file": "prod/auth-events-2024-01-15.jsonl"
}

High-Volume Parquet Processing

Processing large Parquet files with optimized settings for high-volume data...

- id: 3
name: parquet-bulk-processor
type: azblob
properties:
connection_string: "DefaultEndpointsProtocol=https;AccountName=datawarehouse;AccountKey=key456"
container_name: "analytics"
file_format: "parquet"
batch_size: 10000
max_concurrent_files: 3
poll_interval: 1800
delete_after_processing: true

Optimized for processing large Parquet files with batching and automatic cleanup after successful processing...

{
"record_id": "rec_001",
"metric_value": 42.5,
"timestamp": "2024-01-15T10:30:00Z",
"batch_info": {
"file": "analytics/metrics-2024-01-15.parquet",
"batch_size": 10000
}
}

Pipeline Processing

Integrating blob storage device with preprocessing pipeline for data transformation...

- id: 4
name: blob-with-pipeline
type: azblob
tags:
- "azure_storage"
- "raw_data"
pipelines:
- timestamp-normalization
- field-enrichment
properties:
connection_string: "DefaultEndpointsProtocol=https;AccountName=rawdata;AccountKey=key789"
container_name: "raw-logs"
file_format: "json"

Raw blob data is processed through pipelines for timestamp normalization and field enrichment before routing to targets...

{
"timestamp": "2024-01-15T10:30:00.000Z",
"level": "INFO",
"message": "User login successful",
"enriched_data": {
"normalized_timestamp": "2024-01-15T10:30:00Z",
"severity_level": 6,
"source_container": "raw-logs"
}
}

Path-Based File Organization

Using path prefixes to organize and process files from specific subdirectories...

- id: 5
name: organized-blob-reader
type: azblob
properties:
connection_string: "DefaultEndpointsProtocol=https;AccountName=organized;AccountKey=keyABC"
container_name: "structured-data"
path_prefix: "2024/01/security/"
file_format: "jsonl"
poll_interval: 600

Device only processes files from the specific path structure, enabling organized data ingestion patterns...

{
"security_event": "failed_login",
"user": "user456",
"timestamp": "2024-01-15T10:30:00Z",
"file_path": "2024/01/security/failed-logins-15.jsonl",
"path_metadata": {
"year": "2024",
"month": "01",
"category": "security"
}
}

Error Recovery Configuration

Configuring robust error handling with retry logic and processing state management...

- id: 6
name: resilient-blob-reader
type: azblob
properties:
connection_string: "DefaultEndpointsProtocol=https;AccountName=resilient;AccountKey=keyXYZ"
container_name: "critical-data"
file_format: "json"
poll_interval: 120
max_concurrent_files: 2
delete_after_processing: false

Conservative settings preserve files after processing and limit concurrency for stable processing of critical data...

{
"critical_event": "system_alert",
"severity": "high",
"timestamp": "2024-01-15T10:30:00Z",
"processing_info": {
"file_preserved": true,
"retry_count": 0,
"processing_status": "success"
}
}