Amazon MSK
Synopsis
Creates a target that writes log messages to Amazon Managed Streaming for Apache Kafka (MSK) topics with support for batching, compression, and AWS authentication methods. The target handles message delivery efficiently with configurable batch limits based on size or event count. Amazon MSK is a fully managed Apache Kafka service on AWS.
Schema
- name: <string>
  description: <string>
  type: amazonmsk
  pipelines: <pipeline[]>
  status: <boolean>
  properties:
    address: <string>
    port: <numeric>
    client_id: <string>
    topic: <string>
    algorithm: <string>
    username: <string>
    password: <string>
    compression: <string>
    compression_level: <string>
    acknowledgments: <string>
    allow_auto_topic_creation: <boolean>
    disable_idempotent_write: <boolean>
    max_bytes: <numeric>
    max_events: <numeric>
    field_format: <string>
    tls:
      status: <boolean>
      insecure_skip_verify: <boolean>
      min_tls_version: <string>
      max_tls_version: <string>
      cert_name: <string>
      key_name: <string>
      passphrase: <string>
    interval: <string|numeric>
    cron: <string>
Configuration
The following fields are used to define the target:
| Field | Required | Default | Description | 
|---|---|---|---|
name | Y | Target name | |
description | N | - | Optional description | 
type | Y | Must be amazonmsk | |
pipelines | N | - | Optional post-processor pipelines | 
status | N | true | Enable/disable the target | 
Amazon MSK Connection
| Field | Required | Default | Description | 
|---|---|---|---|
address | Y | - | MSK broker bootstrap server address (e.g., b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com) | 
port | N | 9092 | MSK broker port (9092 for plaintext, 9094 for SASL/SCRAM, 9098 for IAM) | 
client_id | N | - | Client identifier for connection tracking | 
topic | Y | - | Kafka topic name for message delivery | 
Authentication
| Field | Required | Default | Description | 
|---|---|---|---|
algorithm | N | "none" | Authentication mechanism: none, plain, scram-sha-256, scram-sha-512 | 
username | N* | - | Username for SASL authentication | 
password | N* | - | Password for SASL authentication | 
* = Conditionally required when using SASL authentication.
For IAM authentication, use AWS IAM roles and policies. IAM authentication is configured at the AWS MSK cluster level and does not require username/password in the target configuration.
Producer Settings
| Field | Required | Default | Description | 
|---|---|---|---|
compression | N | "none" | Message compression: none, gzip, snappy, lz4, zstd | 
compression_level | N | - | Compression level (algorithm-specific) | 
acknowledgments | N | "leader" | Acknowledgment level: none, leader, all | 
allow_auto_topic_creation | N | false | Allow automatic topic creation if topic doesn't exist | 
disable_idempotent_write | N | false | Disable idempotent producer (not recommended) | 
Batch Configuration
| Field | Required | Default | Description | 
|---|---|---|---|
max_bytes | N | 0 | Maximum batch size in bytes (0 = unlimited) | 
max_events | N | 1000 | Maximum number of events per batch | 
field_format | N | - | Data normalization format. See applicable Normalization section | 
Batches are sent when either max_bytes or max_events limit is reached, whichever comes first.
TLS Configuration
| Field | Required | Default | Description | 
|---|---|---|---|
tls.status | N | false | Enable TLS/SSL encryption | 
tls.insecure_skip_verify | N | false | Skip certificate verification (not recommended) | 
tls.min_tls_version | N | "tls1.2" | Minimum TLS version: tls1.2, tls1.3 | 
tls.max_tls_version | N | "tls1.3" | Maximum TLS version: tls1.2, tls1.3 | 
tls.cert_name | N | "cert.pem" | Client certificate file name for mTLS | 
tls.key_name | N | "key.pem" | Private key file name for mTLS | 
tls.passphrase | N | - | Passphrase for encrypted private key | 
Scheduler
| Field | Required | Default | Description | 
|---|---|---|---|
interval | N | realtime | Execution frequency. See Interval for details | 
cron | N | - | Cron expression for scheduled execution. See Cron for details | 
Details
Amazon MSK is a fully managed Apache Kafka service on AWS. This target type allows you to connect to MSK clusters using the standard Kafka protocol.
Authentication Methods
Amazon MSK supports multiple authentication methods:
Unauthenticated Access (plaintext)
- No authentication required
 - Port: 9092
 - Set 
algorithm: "none" - Not recommended for production
 
SASL/SCRAM Authentication
- Username/password authentication stored in AWS Secrets Manager
 - Port: 9094
 - Supported algorithms: 
scram-sha-256,scram-sha-512 - TLS encryption recommended
 
IAM Authentication
- Uses AWS IAM roles and policies for authentication
 - Port: 9098
 - Configured at the AWS level, not in target configuration
 - Requires AWS credentials configured on the DataStream host
 - TLS encryption required
 
Connection Requirements
MSK cluster bootstrap servers are available in the AWS MSK Console. The address format is typically:
b-1.clustername.xxxxxx.kafka.region.amazonaws.com
For multi-broker clusters, you can use any broker from the bootstrap server list.
TLS Configuration
MSK supports both plaintext and TLS-encrypted connections:
- Plaintext: Port 9092 (unauthenticated) or 9094 (SASL/SCRAM)
 - TLS: Port 9094 (SASL/SCRAM) or 9098 (IAM)
 
Enable TLS with tls.status: true for encrypted connections.
Message Delivery Guarantees
The acknowledgments setting controls delivery guarantees:
| Level | Behavior | Use Case | 
|---|---|---|
none | No acknowledgment from broker | Maximum throughput, lowest durability | 
leader | Acknowledgment from partition leader only | Balanced throughput and durability | 
all | Acknowledgment from all in-sync replicas | Maximum durability | 
Compression
Message compression reduces network bandwidth and storage requirements:
| Algorithm | Compression Ratio | CPU Usage | Speed | 
|---|---|---|---|
none | None | Minimal | Fastest | 
gzip | High | High | Slow | 
snappy | Medium | Low | Fast | 
lz4 | Medium | Low | Fast | 
zstd | High | Medium | Medium | 
Examples
Basic Configuration (Unauthenticated)
The minimum configuration for an MSK target without authentication:
targets:
  - name: basic_msk
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9092
      topic: "application-logs"
      algorithm: "none"
With SASL/SCRAM-SHA-512
Configuration using SASL/SCRAM-SHA-512 authentication:
targets:
  - name: msk_sasl
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9094
      topic: "authenticated-logs"
      algorithm: "scram-sha-512"
      username: "kafka-producer"
      password: "stored-in-secrets-manager"
      tls:
        status: true
With SASL/SCRAM-SHA-256
Configuration using SASL/SCRAM-SHA-256 authentication:
targets:
  - name: msk_scram256
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9094
      topic: "secure-logs"
      algorithm: "scram-sha-256"
      username: "kafka-user"
      password: "secure-password"
      compression: "snappy"
      tls:
        status: true
IAM Authentication
Configuration for MSK with IAM authentication:
targets:
  - name: msk_iam
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9098
      topic: "iam-logs"
      algorithm: "none"
      tls:
        status: true
For IAM authentication, configure AWS credentials on the DataStream host using environment variables, IAM role, or AWS credentials file. The IAM policy must grant appropriate Kafka permissions.
With Compression
Configuration with zstd compression:
targets:
  - name: compressed_msk
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9094
      topic: "compressed-logs"
      algorithm: "scram-sha-256"
      username: "kafka-producer"
      password: "password-here"
      compression: "zstd"
      tls:
        status: true
High Throughput
Configuration optimized for maximum throughput:
targets:
  - name: high_throughput_msk
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9094
      topic: "high-volume-logs"
      algorithm: "scram-sha-512"
      username: "kafka-producer"
      password: "password-here"
      compression: "lz4"
      acknowledgments: "leader"
      max_bytes: 1048576
      max_events: 10000
      tls:
        status: true
High Reliability
Configuration optimized for maximum durability:
targets:
  - name: reliable_msk
    type: amazonmsk
    pipelines:
      - checkpoint
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9094
      topic: "critical-logs"
      algorithm: "scram-sha-512"
      username: "kafka-producer"
      password: "password-here"
      compression: "zstd"
      acknowledgments: "all"
      max_events: 100
      disable_idempotent_write: false
      tls:
        status: true
        min_tls_version: "tls1.3"
With Field Normalization
Using field normalization for standard format:
targets:
  - name: normalized_msk
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9094
      topic: "normalized-logs"
      algorithm: "scram-sha-256"
      username: "kafka-producer"
      password: "password-here"
      field_format: "cim"
      compression: "snappy"
      tls:
        status: true
MSK Serverless
Configuration for MSK Serverless clusters:
targets:
  - name: msk_serverless
    type: amazonmsk
    properties:
      address: "boot-abc123.c1.kafka-serverless.us-east-1.amazonaws.com"
      port: 9098
      topic: "serverless-logs"
      algorithm: "none"
      compression: "zstd"
      tls:
        status: true
With Client ID
Configuration with client ID for tracking:
targets:
  - name: msk_tracked
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9094
      topic: "tracked-logs"
      client_id: "datastream-producer-01"
      algorithm: "scram-sha-256"
      username: "kafka-producer"
      password: "password-here"
      compression: "lz4"
      tls:
        status: true
Scheduled Batching
Configuration with scheduled batch delivery:
targets:
  - name: scheduled_msk
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9094
      topic: "scheduled-logs"
      algorithm: "scram-sha-512"
      username: "kafka-producer"
      password: "password-here"
      max_events: 5000
      interval: "5m"
      compression: "gzip"
      tls:
        status: true
Auto Topic Creation
Configuration with automatic topic creation enabled:
targets:
  - name: auto_topic_msk
    type: amazonmsk
    properties:
      address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
      port: 9094
      topic: "dynamic-logs"
      algorithm: "scram-sha-256"
      username: "kafka-producer"
      password: "password-here"
      allow_auto_topic_creation: true
      compression: "snappy"
      max_events: 500
      tls:
        status: true