Skip to main content

RabbitMQ

Pull Push

Synopsis

Creates a consumer that connects to RabbitMQ servers and consumes messages from specified exchanges and queues. Supports multiple authentication methods, exchange types, TLS encryption, and multiple workers with automatic message acknowledgment.

Schema

- id: <numeric>
name: <string>
description: <string>
type: rabbitmq
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
username: <string>
password: <string>
authentication: <string>
exchange:
name: <string>
type: <string>
queue:
name: <string>
key: <string>
reuse: <boolean>
workers: <numeric>
buffer_size: <numeric>
stats_frequency: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>

Configuration

The following are the minimum requirements to define the device.

Device

FieldRequiredDefaultDescription
idYUnique identifier
nameYDevice name
descriptionN-Optional description
typeYMust be rabbitmq
tagsN-Optional tags
pipelinesN-Optional pre-processor pipelines
statusNtrueEnable/disable the device

Connection

FieldRequiredDefaultDescription
addressN"0.0.0.0"RabbitMQ server address
portYRabbitMQ server port
usernameN-Authentication username
passwordN-Authentication password
authenticationYAuth type (plain, amqplain)

Exchange

FieldRequiredDefaultDescription
exchange.nameYExchange name
exchange.typeYExchange type (direct, fanout, topic, x-custom)

Queue

FieldRequiredDefaultDescription
queue.nameYQueue name
queue.keyYRouting key pattern

TLS

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS encryption
tls.cert_nameYTLS certificate file path (required if TLS enabled)
tls.key_nameYTLS private key file path (required if TLS enabled)
note

The TLS certificate and key files must be placed in the service root directory.

Performance

FieldRequiredDefaultDescription
reuseNtrueEnable multi-worker mode
workersN4Number of worker processes when reuse enabled
buffer_sizeN9000Read buffer size in bytes
stats_frequencyN300Statistics collection interval in seconds

Advanced Features

The following are unique features that Director offers.

Exchange Types

The collector supports various exchange types:

Exchange TypeDescription
directExact routing key match
fanoutBroadcast to all bound queues
topicPattern-based routing using wildcards
x-customCustom exchange types (if supported by server)

Multiple Workers

When reuse is enabled, the collector uses multiple workers which maintain their own RabbitMQ consumers and process messages independently. Messages are automatically acknowledged.

note

The worker count will be capped at the number of available CPU cores.

Messages

The collector supports multiple exchange types, pattern-based routing, message acknowledgment and rejection, dead letter exchanges, and custom message processing pipelines. It also supports TLS-encrypted connections and multiple authentication methods.

Examples

The following are commonly used configuration types.

Basic

A basic consumer can be easily created.

Creating a simple RabbitMQ consumer...

- id: 1
name: basic_rabbitmq
type: rabbitmq
properties:
address: "rabbitmq.example.com"
port: 5672
authentication: "plain"
exchange:
name: "logs"
type: "direct"
queue:
name: "app_logs"
key: "app.*"

Secure

The collector can connect to secure servers:

Connecting with authentication and encryption...

- id: 2
name: secure_rabbitmq
type: rabbitmq
properties:
address: "rabbitmq.example.com"
port: 5671
username: "consumer"
password: "secret"
authentication: "plain"
exchange:
name: "secure_logs"
type: "topic"
queue:
name: "secure_app_logs"
key: "secure.app.#"
tls:
status: true
cert_name: "rabbitmq.crt"
key_name: "rabbitmq.key"

High-Volume

Performance can be enhanced for high message volumes:

Optimizing for high throughput...

- id: 3
name: performant_rabbitmq
type: rabbitmq
properties:
address: "rabbitmq.example.com"
port: 5672
authentication: "plain"
exchange:
name: "high_volume"
type: "direct"
queue:
name: "high_volume_logs"
key: "logs"
reuse: true
workers: 4
buffer_size: 32768
stats_frequency: 60

Topic Exchange

Pattern-based message routing is possible:

Configuring topic-based routing...

- id: 4
name: topic_rabbitmq
type: rabbitmq
properties:
address: "rabbitmq.example.com"
port: 5672
authentication: "plain"
exchange:
name: "logs"
type: "topic"
queue:
name: "filtered_logs"
key: "app.*.error"
tip

Topic routing keys support * and # wildards for single and multiple words respectively.

Pipelines

Messages can be pre-processed:

Applying custom processing to messages...

- id: 5
name: pipeline_rabbitmq
type: rabbitmq
pipelines:
- json_parser
- field_extractor
properties:
address: "rabbitmq.example.com"
port: 5672
authentication: "plain"
exchange:
name: "raw_logs"
type: "direct"
queue:
name: "processed_logs"
key: "logs"
note

Pipelines are processed sequentially, and can modify or drop messages before ingestion.