Pipelines: Overview
VirtualMetric DataStream pipelines were designed to automate large-volume data processing. They can be used to extract values from various sources, transform or convert these values, enrich them by correlating them with other available information, and to forward them to various destinations for consumption.
In short, they are aimed at helping you design and implement your telemetry workflow.
Definitions
A pipeline, in its simplest form, is a chain of processors that run sequentially, operating on the incoming data streamed from providers, and directing them to destinations for consumption.
The sources can be devices, networks, or other pipelines, and the targets can be consoles, files, storage systems, or other pipelines. Sources and targets can be connected to each other in one-to-one, one-to-many, many-to-one, and many-to-many configurations.
Each incoming stream can be queried with criteria specific to the information contained therein, and each outgoing stream can be enriched with inferences made from correlations for use on the destination side.
The following table summarizes the basic structure of a pipeline:
Provider (raw data) → Pipeline (transformation) → Consumer (processed data)
where Pipeline (transformation) is
(processor1 ∘ processor2 ∘ … ∘ processorn)(data)
In real life scenarios, the level of complexity of the configurations will vary based on the requirements of the consumers and the targeted destinations.
Configuration
All pipelines share the following base configuration fields:
Field | Required | Default | Description |
---|---|---|---|
name | Y | Unique identifier for the pipeline | |
description | N | Optional explanation of the pipeline's purpose | |
processors | Y | Ordered list of processors to execute | |
if | N | Conditional expression to determine if pipeline should be applied | |
on_failure | N | Processors to execute if the pipeline fails | |
ignore_failure | N | false | Continue pipeline execution despite processor failures |
Use the name
of the pipeline to refer to it in your configurations.
Example:
pipelines:
- name: checkpoint
processors:
- grok:
field: message
patterns:
- "%{COMMONAPACHELOG}"
- set:
field: event.provider
value: "checkpoint"
This pipeline uses a grok
processor with a pattern that extracts Apache log data, and a set
processor to write "checkpoint
" to the event.provider
field.
Deployment
There are multiple ways to deploy pipelines, and they can be used in various configurations:
-
A pipeline can consume data from multiple providers, and—once it is finished processing them—can direct some or all of the processed data to a specific consumer in a one-to-one scheme:
Provider1[A] → Pipeline → Consumer1(A)
-
It can also consume data from a single provider and deliver the processed results to multiple consumers:
Provider1[A] + Provider2[B] → Pipeline → Consumer1(A) + Consumer2(B)
-
Finally—and possibly in most real-life scenarios—there may be multiple providers and consumers connected to each other in much more complex arrangements:
Provider1[A, B] + Provider2[C] + Provider3[D] → Pipeline → Consumer1(A, C) + Consumer2(A, B, D)
Effectively, each source may be the target of an upstream pipeline, and each target may serve as the source for a downstream one. The detail to keep in mind is that each provider side is delegating some processing to the next pipeline based on the established requirements of the consumer side. The pipeline is acting as the middleman for the data interchange.
Even if the pipeline routes some of the data without any processing, this will be due to the demands of the consumer side, so the pipeline is still performing a meaningful role by forwarding the data as per the consumer's policy.
Pipeline Types
Pipelines can be categorized based on their role in the data processing workflow:
-
Pre-processing - These pipelines prepare raw data for further processing: Parse raw logs, normalize timestamps, remove unnecessary fields
-
Routing - These pipelines determine where data should be directed: Filter events by criteria, tag data for specific destinations
-
Transformation - These pipelines change data structure or format: Field mapping, data type conversion, schema alignment
-
Enrichment - These pipelines add context to existing data: Lookup tables, geolocation, threat intelligence integration
-
Post-processing - These pipelines prepare data for specific targets: Format adaptation, field mapping to destination schemas
Use Cases
Pipelines can be put to good use to implement some common patterns for data processing. The following table summarizes a few typical layouts and their recommended processors:
Use Case | Recommended Processors |
---|---|
Log Parsing | grok → date → rename → remove |
Security Events | kv → geoip → set → script |
API Data | json → convert → rename → set |
CSV Processing | csv → convert → rename |
Field Standardization | rename → lowercase → set |
And here are some examples:
-
Log Parsing and Normalization: A common use case is parsing and normalizing log messages from various sources. For example, a pipeline that processes syslog messages might look like this:
pipelines:
- name: syslog_parser
description: "Parse and normalize syslog messages"
processors:
- grok:
field: message
patterns:
- "%{SYSLOGLINE}"
- date:
field: timestamp
target_field: "@timestamp"
formats:
- "MMM dd HH:mm:ss"
- "MMM d HH:mm:ss"
- set:
field: event.kind
value: event
- rename:
fields:
- from: program
to: process.name
- from: logsource
to: host.hostname -
Field Enrichment: A pipeline can be used to enrich security events with additional context, such as geolocation data or threat intelligence:
pipelines:
- name: security_enrichment
description: "Enrich security events with additional context"
processors:
- geoip:
field: source.ip
target_field: source.geo
- set:
field: event.category
value: security
- script:
lang: Go
source: |
if strings.HasPrefix(source.ip, '192.168.') {
source.type = 'internal';
} else {
source.type = 'external';
} -
Data Transformation: A sequence of processors can be used to transform data formats, such as converting IP addresses or normalizing field names:
pipelines:
- name: data_normalizer
description: "Standardize field names and values"
processors:
- rename:
fields:
- from: src
to: source.ip
- from: dst
to: destination.ip
- convert:
field: bytes
type: long
- remove:
field:
- bytes_in
- bytes_out
Implementation Strategies
Streamlined Streams
The incoming data streams will have their own structure. As such, they will at best be only partially suitable for analysis and, later, decision making. In its raw form, data is frequently not—at least may not be—what it seems to be. It has to be sorted and sifted through to pick the relevant information from it.
The process of selecting or discarding items based on specific criteria is called curation. This involves checking whether a field's value matches or contains the values or fragments of values we are looking for.
After this, the remaining data may need to be converted into forms making them more suitable for analysis and use. This second phase is called transformation.
Finally, the data may contain hints or fragments of information which, when correlated with other available information, may yield insights that may be prerequisite to analysis and use. Adding correlated information in order to render the data more relevant—or increase its relevance—is known as enrichment.
It is through this type of seamless three-stage design that a pipeline truly shines and proves itself indispensible for telemetry.
Pipeline Composition
Divide and conquer your data processing by composing targeted pipelines:
- Modular Design: Create small, focused pipelines that handle specific tasks
- Reusable Components: Build common processing patterns as reusable pipeline segments
- Conditional Processing: Use conditional statements to selectively apply processors
- Error Handling: Implement robust error handling with
on_failure
andignore_failure
Conditional Execution
Pipelines and processors support conditional execution using the if
parameter:
pipelines:
- name: web_logs
processors:
- set:
field: event.category
value: web
if: "_ingest.key.source == 'apache'|| _ingest.key.source == 'nginx'"
- geoip:
field: client.ip
target_field: client.geo
if: "_ingest._key.client?.ip != null"
Pipeline Chaining
Pipelines can be chained together using routes or referenced in device configurations:
devices:
- name: web_server_logs
type: http
pipelines:
- common_enrichment
- web_logs
properties:
port: 8080
Next Steps
For an example of a locally validated pipeline, see our A Local Pipeline tutorial.