Skip to main content

Pipelines

A pipeline defines how data flows from inputs, through processors, to outputs.

Pipeline Structure

Every pipeline has three main sections:

input:
# Where data comes from
kafka:
addresses: ["localhost:9092"]
topics: ["logs"]

pipeline:
processors:
# What to do with the data
- mapping: |
root.message = this.msg.uppercase()
root.timestamp = now()

output:
# Where data goes
s3:
bucket: processed-logs
path: "logs/${!timestamp_unix()}.json"

Inputs

Sources of data:

  • Message queues: Kafka, RabbitMQ, NATS
  • Databases: PostgreSQL, MongoDB, MySQL
  • Files: Local, S3, SFTP
  • HTTP: Webhooks, APIs
  • Streams: TCP, UDP, WebSocket
  • Generate: Test data

Browse all inputs →


Processors

Transform, filter, and enrich data:

  • Mapping: Transform with Bloblang
  • Filter: Drop unwanted messages
  • Parse: JSON, CSV, XML, Avro, Protobuf
  • Enrich: Lookup external data
  • Aggregate: Batch, window, group

Browse all processors →


Outputs

Destinations for processed data:

  • Message queues: Kafka, RabbitMQ, NATS
  • Databases: PostgreSQL, Elasticsearch
  • Object storage: S3, GCS, Azure Blob
  • HTTP: Webhooks, APIs
  • Files: Local, S3, SFTP
  • Observability: Datadog, Prometheus

Browse all outputs →


Simple Example

Read files, filter, output to terminal:

input:
file:
paths: ["/var/log/*.log"]

pipeline:
processors:
- mapping: |
# Only keep ERROR logs
root = if !this.contains("ERROR") { deleted() }

output:
stdout:
codec: lines

Multi-Output Example

Route data to different destinations:

input:
http_server:
address: "0.0.0.0:8080"

pipeline:
processors:
- mapping: |
root = this.parse_json()

output:
broker:
pattern: fan_out
outputs:
# Errors to Slack
- switch:
cases:
- check: this.level == "ERROR"
output:
http_client:
url: "https://slack.com/webhook"

# Metrics to Prometheus
- switch:
cases:
- check: this.type == "metric"
output:
prometheus_push_gateway:
url: "http://prometheus:9091"

# Everything to S3
- s3:
bucket: all-events

Deployment

Pipelines deploy to nodes based on:

  • Direct selection: Choose specific nodes
  • Label selectors: Target nodes with matching labels
  • Network: All nodes in a network

Example with labels:

# Deploy to production log processors only
selector:
env: production
role: log-processor

What's Next?

👉 Components - Learn about building blocks

👉 Build a Pipeline - Get hands-on

👉 Bloblang Guide - Master data transformations