Pipelines
A pipeline defines how data flows from inputs, through processors, to outputs.
Pipeline Structure
Every pipeline has three main sections:
input:
# Where data comes from
kafka:
addresses: ["localhost:9092"]
topics: ["logs"]
pipeline:
processors:
# What to do with the data
- mapping: |
root.message = this.msg.uppercase()
root.timestamp = now()
output:
# Where data goes
s3:
bucket: processed-logs
path: "logs/${!timestamp_unix()}.json"
Inputs
Sources of data:
- Message queues: Kafka, RabbitMQ, NATS
- Databases: PostgreSQL, MongoDB, MySQL
- Files: Local, S3, SFTP
- HTTP: Webhooks, APIs
- Streams: TCP, UDP, WebSocket
- Generate: Test data
Processors
Transform, filter, and enrich data:
- Mapping: Transform with Bloblang
- Filter: Drop unwanted messages
- Parse: JSON, CSV, XML, Avro, Protobuf
- Enrich: Lookup external data
- Aggregate: Batch, window, group
Outputs
Destinations for processed data:
- Message queues: Kafka, RabbitMQ, NATS
- Databases: PostgreSQL, Elasticsearch
- Object storage: S3, GCS, Azure Blob
- HTTP: Webhooks, APIs
- Files: Local, S3, SFTP
- Observability: Datadog, Prometheus
Simple Example
Read files, filter, output to terminal:
input:
file:
paths: ["/var/log/*.log"]
pipeline:
processors:
- mapping: |
# Only keep ERROR logs
root = if !this.contains("ERROR") { deleted() }
output:
stdout:
codec: lines
Multi-Output Example
Route data to different destinations:
input:
http_server:
address: "0.0.0.0:8080"
pipeline:
processors:
- mapping: |
root = this.parse_json()
output:
broker:
pattern: fan_out
outputs:
# Errors to Slack
- switch:
cases:
- check: this.level == "ERROR"
output:
http_client:
url: "https://slack.com/webhook"
# Metrics to Prometheus
- switch:
cases:
- check: this.type == "metric"
output:
prometheus_push_gateway:
url: "http://prometheus:9091"
# Everything to S3
- s3:
bucket: all-events
Deployment
Pipelines deploy to nodes based on:
- Direct selection: Choose specific nodes
- Label selectors: Target nodes with matching labels
- Network: All nodes in a network
Example with labels:
# Deploy to production log processors only
selector:
env: production
role: log-processor
What's Next?
👉 Components - Learn about building blocks
👉 Build a Pipeline - Get hands-on
👉 Bloblang Guide - Master data transformations