Skip to main content

Pipeline

Control Flow Elastic Compatible

Synopsis

Executes another pipeline by name, allowing for pipeline reuse and modular configurations.

Schema

pipeline:
- name: <string>
- description: <text>
- if: <script>
- ignore_failure: <boolean>
- ignore_missing_pipeline: <boolean>
- on_failure: <processor[]>
- on_success: <processor[]>
- tag: <string>

Configuration

FieldRequiredDefaultDescription
nameY-Name or reference of the pipeline to execute
descriptionN-Explanatory note
ifN-Condition to run
ignore_failureNfalseSee Handling Failures
ignore_missing_pipelineNfalseIf true, silently continue when referenced pipeline is not found
on_failureN-See Handling Failures
on_successN-See Handling Success
tagN-Identifier

Details

The pipeline processor can reference other pipelines using the syntax {{ IngestPipeline "pipeline-name" }}. The names can be specified with or without the .yml/.yaml extension.

warning

As pipeline references are resolved at runtime, make sure all referenced pipelines exist in your configuration, or set ignore_missing_pipeline to true if they are optional.

warning

Be careful with recursive pipeline references to avoid infinite loops. Currently, the processor does not detect circular dependencies.

Examples

Basic

Another pipeline to be executed...

pipelines:
- name: main
processors:
- pipeline:
name: '{{ IngestPipeline "parse-logs" }}'

must be defined in the configuration:

pipelines:
- name: parse-logs
processors:
- grok:
field: message
pattern: '%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}'

Conditionals

Executing the pipeline based on criteria...

pipeline:
- name: '{{ IngestPipeline "enrich-data" }}'
- if: 'ctx.level == "ERROR"'
- ignore_failure: true

helps control the flow:

pipelines:
- name: enrich-data
processors:
- append:
field: tags
value: needs-review

Error Handling

Handle missing pipelines...

pipeline:
- name: '{{ IngestPipeline "optional-pipeline" }}'
- ignore_missing_pipeline: true
- on_failure:
- append:
field: tags
value: pipeline-failed

and specify fallback actions:

pipelines:
- name: main
processors:
- append:
field: status
value: processed