Skip to main content

Kafka

Pull Push

Synopsis

Creates a collector that connects to Kafka brokers and consumes messages from specified topics. Supports authentication, TLS encryption, and multiple workers.

Schema

- id: <numeric>
name: <string>
description: <string>
type: kafka
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
username: <string>
password: <string>
group: <string>
topic: <string>
balancer: <string>
reuse: <boolean>
workers: <numeric>
buffer_size: <numeric>
stats_frequency: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>

Configuration

The following are the minimum requirements to define the device.

Device

FieldRequiredDefaultDescription
idYUnique identifier
nameYDevice name
descriptionN-Optional description
typeYMust be kafka
tagsN-Optional tags
pipelinesN-Optional pre-processor pipelines
statusNtrueEnable/disable the device

Connection

FieldRequiredDefaultDescription
addressN"0.0.0.0"Kafka broker address
portYKafka broker port
usernameN-SASL username
passwordN-SASL password
groupN"vmetric"Consumer group ID
topicYTopic to consume from
balancerN"roundrobin"Partition balancing strategy

TLS

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS encryption
tls.cert_nameYTLS certificate file path (required if TLS enabled)
tls.key_nameYTLS private key file path (required if TLS enabled)
note

The TLS certificate and key files must be placed in the service root directory.

Performance

FieldRequiredDefaultDescription
reuseNtrueEnable multi-worker mode
workersN4Number of worker processes when reuse enabled
buffer_sizeN9000Read buffer size in bytes
stats_frequencyN300Statistics collection interval in seconds

Advanced Features

Multiple Workers

When reuse is enabled, the collector uses multiple workers. Each worker maintains its own Kafka consumer, and processes its own messages independently, automatically balancing message volumes. The worker count is capped at the number of available CPU cores.

Messages

The collector supports message offset tracking and commits, automatic consumer group rebalancing, multiple topic subscriptions, TLS-encrypted connections and SASL authentication, and custom message-processing pipelines.

Examples

The following are commonly used configuration types.

Basic

The minimum required configuration creates the consumer:

Creating a simple Kafka consumer...

- id: 1
name: basic_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "logs"

Secure

The consumer can connect to secure Kafka brokers:

Connecting with authentication and encryption...

- id: 2
name: secure_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9093
username: "consumer"
password: "secret"
topic: "secure-logs"
tls:
status: true
cert_name: "kafka.crt"
key_name: "kafka.key"

High-Volume

Performance can be enhanced for high message volumes:

Optimizing for throughput...

- id: 3
name: performant_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "high-volume-logs"
group: "high-perf-group"
reuse: true
workers: 4
buffer_size: 32768
stats_frequency: 60
note

When reuse is enabled, the actual worker count will be capped at the number of available CPU cores.

Consumer Groups

Message consumption can be coordinated:

Configuring consumer group behavior...

- id: 4
name: group_kafka
type: kafka
properties:
address: "kafka.example.com"
port: 9092
topic: "shared-logs"
group: "processing-group"
balancer: "roundrobin"
reuse: true
workers: 2
warning

The consumers in the same group must use compatible configuration settings.

Messages

Messages can be pre-processed:

Applying custom processing to messages...

- id: 5
name: pipeline_kafka
type: kafka
pipelines:
- json_parser
- field_extractor
properties:
address: "kafka.example.com"
port: 9092
topic: "raw-logs"
group: "processing-group"
note

Pipelines are processed sequentially, and can modify or drop messages before ingestion.