RabbitMQ
Synopsis
Creates a consumer that connects to RabbitMQ servers and consumes messages from specified exchanges and queues. Supports multiple authentication methods, exchange types, TLS encryption, and multiple workers with automatic message acknowledgment.
Schema
- id: <numeric>
name: <string>
description: <string>
type: rabbitmq
tags: <string[]>
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
username: <string>
password: <string>
authentication: <string>
exchange:
name: <string>
type: <string>
queue:
name: <string>
key: <string>
reuse: <boolean>
workers: <numeric>
buffer_size: <numeric>
stats_frequency: <numeric>
tls:
status: <boolean>
cert_name: <string>
key_name: <string>
Configuration
The following are the minimum requirements to define the device.
Device
Field | Required | Default | Description |
---|---|---|---|
id | Y | Unique identifier | |
name | Y | Device name | |
description | N | - | Optional description |
type | Y | Must be rabbitmq | |
tags | N | - | Optional tags |
pipelines | N | - | Optional pre-processor pipelines |
status | N | true | Enable/disable the device |
Connection
Field | Required | Default | Description |
---|---|---|---|
address | N | "0.0.0.0" | RabbitMQ server address |
port | Y | RabbitMQ server port | |
username | N | - | Authentication username |
password | N | - | Authentication password |
authentication | Y | Auth type (plain , amqplain ) |
Exchange
Field | Required | Default | Description |
---|---|---|---|
exchange.name | Y | Exchange name | |
exchange.type | Y | Exchange type (direct , fanout , topic , x-custom ) |
Queue
Field | Required | Default | Description |
---|---|---|---|
queue.name | Y | Queue name | |
queue.key | Y | Routing key pattern |
TLS
Field | Required | Default | Description |
---|---|---|---|
tls.status | N | false | Enable TLS encryption |
tls.cert_name | Y | TLS certificate file path (required if TLS enabled) | |
tls.key_name | Y | TLS private key file path (required if TLS enabled) |
The TLS certificate and key files must be placed in the service root directory.
Performance
Field | Required | Default | Description |
---|---|---|---|
reuse | N | true | Enable multi-worker mode |
workers | N | 4 | Number of worker processes when reuse enabled |
buffer_size | N | 9000 | Read buffer size in bytes |
stats_frequency | N | 300 | Statistics collection interval in seconds |
Advanced Features
The following are unique features that Director offers.
Exchange Types
The collector supports various exchange types:
Exchange Type | Description |
---|---|
direct | Exact routing key match |
fanout | Broadcast to all bound queues |
topic | Pattern-based routing using wildcards |
x-custom | Custom exchange types (if supported by server) |
Multiple Workers
When reuse
is enabled, the collector uses multiple workers which maintain their own RabbitMQ consumers and process messages independently. Messages are automatically acknowledged.
The worker count will be capped at the number of available CPU cores.
Messages
The collector supports multiple exchange types, pattern-based routing, message acknowledgment and rejection, dead letter exchanges, and custom message processing pipelines. It also supports TLS-encrypted connections and multiple authentication methods.
Examples
The following are commonly used configuration types.
Basic
A basic consumer can be easily created.
Creating a simple RabbitMQ consumer... |
|
Secure
The collector can connect to secure servers:
Connecting with authentication and encryption... |
|
High-Volume
Performance can be enhanced for high message volumes:
Optimizing for high throughput... |
|
Topic Exchange
Pattern-based message routing is possible:
Configuring topic-based routing... |
|
Topic routing keys support *
and #
wildards for single and multiple words respectively.
Pipelines
Messages can be pre-processed:
Applying custom processing to messages... |
|
Pipelines are processed sequentially, and can modify or drop messages before ingestion.