Amazon S3
Synopsis
Amazon S3 device processes files from Amazon S3 buckets using SQS event notifications. This pull-type device consumes S3 event messages from an SQS queue, downloads referenced objects from S3, and processes them through DataStream pipelines. The device supports multiple file formats including JSON, JSONL, Parquet, and compressed archives.
Schema
- id: <numeric>
name: <string>
description: <string>
type: awss3
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
region: <string>
access_key_id: <string>
secret_access_key: <string>
session_token: <string>
queue_url: <string>
role_arn: <string>
timeout: <numeric>
file_name_filter: <string>
max_files_in_archive: <numeric>
max_size_archive_bytes: <numeric>
sqs_max_messages: <numeric>
sqs_visibility_timeout: <numeric>
sqs_wait_time_seconds: <numeric>
Configuration
Device
| Field | Required | Default | Description |
|---|---|---|---|
id | Y | - | Unique numeric identifier |
name | Y | - | Device name |
description | N | - | Optional description of the device's purpose |
type | Y | - | Device type identifier (must be awss3) |
tags | N | - | Array of labels for categorization |
pipelines | N | - | Array of preprocessing pipeline references |
status | N | true | Boolean flag to enable/disable the device |
AWS Connection
| Field | Required | Default | Description |
|---|---|---|---|
region | Y | - | AWS region where S3 bucket and SQS queue are located |
queue_url | Y | - | SQS queue URL that receives S3 event notifications |
access_key_id | N | - | AWS access key ID for authentication |
secret_access_key | N | - | AWS secret access key for authentication |
session_token | N | - | AWS session token for temporary credentials |
role_arn | N | - | AWS IAM role ARN to assume for cross-account access |
File Processing
| Field | Required | Default | Description |
|---|---|---|---|
timeout | N | 10 | Polling interval in seconds between SQS queue checks (1-10) |
file_name_filter | N | ".*" | Regular expression filter for S3 object keys |
max_files_in_archive | N | 100 | Maximum number of files to process from archive (0 = unlimited) |
max_size_archive_bytes | N | 104857600 | Maximum total size of archive contents in bytes (0 = unlimited, default: 100 MB) |
SQS Configuration
| Field | Required | Default | Description |
|---|---|---|---|
sqs_max_messages | N | 1 | Maximum messages to receive per poll (1-10) |
sqs_visibility_timeout | N | 30 | Message visibility timeout in seconds (0-43200) |
sqs_wait_time_seconds | N | 20 | Long polling wait time in seconds (0-20) |
The device supports multiple authentication methods:
- IAM Role: Omit credentials to use instance/task IAM role (recommended for EC2/ECS)
- Access Keys: Provide
access_key_idandsecret_access_keyfor static credentials - Role Assumption: Provide
role_arnto assume a different IAM role - Temporary Credentials: Include
session_tokenfor temporary security credentials
Avoid hardcoding access_key_id and secret_access_key in plain text. Prefer IAM roles or reference encrypted secrets (environment variables, vault integrations) supported by DataStream. Rotate credentials regularly and restrict permissions to least privilege.
Details
The Amazon S3 device operates as an event-driven pull-type data source that processes S3 objects based on SQS notifications. The device continuously polls an SQS queue for S3 event messages, downloads the referenced objects, and processes their contents through the telemetry pipeline.
Event Processing Flow: The device receives S3 event notifications from SQS containing bucket name and object key information. For each ObjectCreated event (Put, Post, Copy, CompleteMultipartUpload), the device downloads the S3 object and processes it according to its file type. After successful processing, the SQS message is deleted to prevent reprocessing.
File Format Detection: The device automatically detects file format based on file extension. Supported formats include .json (single JSON object), .jsonl (newline-delimited JSON), .parquet (columnar format), and compressed archives (.gz, .zip, .bz2, .tar). For .log and .txt files, the device performs content auto-detection by examining the first 8KB of data.
Archive Processing: Compressed archives are automatically extracted and processed. The device supports nested archives and applies file name filtering to extracted contents. Archive processing is controlled by max_files_in_archive and max_size_archive_bytes limits to prevent resource exhaustion from maliciously large archives.
Error Handling: The device distinguishes between transient and permanent errors. Transient errors (network issues, throttling) leave the SQS message in the queue for automatic retry. Permanent errors (unsupported format, file name filter mismatch, archive size limits) delete the message to prevent infinite retry loops.
SQS Integration: The device uses SQS long polling to efficiently wait for new messages. The sqs_wait_time_seconds parameter enables long polling to reduce API calls and costs. Message visibility timeout ensures that failed processing attempts don't block other consumers.
Examples
Basic IAM Role Authentication
Configuring Amazon S3 device using IAM role for authentication (recommended for EC2/ECS deployments)... | |
Device polls SQS queue for S3 events and processes CloudTrail log files using instance IAM role... | |
Access Key Authentication
Using static AWS access keys for authentication with file name filtering... | |
Device processes only JSONL files from the 'prod/' prefix, filtering out other files... | |
Cross-Account Role Assumption
Assuming an IAM role in a different AWS account for cross-account S3 access... | |
Device assumes the specified role to access S3 buckets in the partner account... | |
High-Volume Processing
Optimizing for high-volume S3 data processing with batch message retrieval... | |
Device retrieves up to 10 messages per poll with 5-minute visibility timeout for efficient high-volume processing... | |
Parquet File Processing
Processing Parquet files from S3 for analytics data ingestion... | |
Device processes only Parquet files using columnar format for efficient large-dataset handling... | |
Archive Processing with Limits
Processing compressed archives with safety limits to prevent resource exhaustion... | |
Device extracts and processes compressed archives, limiting to 50 files and 50 MB total size... | |
Pipeline Integration
Integrating S3 device with preprocessing pipelines for data transformation... | |
S3 data is processed through normalization and enrichment pipelines before routing to targets... | |