Basic Log Processing Pipeline
This example demonstrates a simple log processing pipeline that reads from a file, parses JSON logs, and outputs to stdout. It's a great starting point for understanding how Expanso pipelines work.
Download & Run
Quick Start:
# Download and run directly
curl -sSL https://docs.expanso.io/examples/basic-pipeline.yaml | expanso-edge run -
# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/basic-pipeline.yaml
expanso-edge run -f my-pipeline.yaml
Download: basic-pipeline.yaml
What This Pipeline Does
- Reads log lines from a file
- Parses each line as JSON
- Extracts the timestamp field
- Outputs the processed logs to stdout
Complete Pipeline
input:
file:
paths: [/var/log/app.log]
codec: lines
pipeline:
processors:
- mapping: |
# Parse the log line as JSON
root = this.parse_json()
- mapping: |
# Parse timestamp field if it exists
root.timestamp = this.timestamp.parse_timestamp("2006-01-02T15:04:05Z")
output:
stdout:
codec: lines
Configuration Breakdown
Input: File
input:
file:
paths: [/var/log/app.log]
codec: lines
The file input reads from /var/log/app.log, processing each line individually. The lines codec tells Expanso to treat each line as a separate message.
Processors: Parse & Transform
pipeline:
processors:
- mapping: |
# Parse the log line as JSON
root = this.parse_json()
- mapping: |
# Parse timestamp field if it exists
root.timestamp = this.timestamp.parse_timestamp("2006-01-02T15:04:05Z")
The pipeline uses two mapping processors (which use the Bloblang language):
- JSON Parsing: Converts each log line from a string to a JSON object
- Timestamp Parsing: Converts the timestamp string to a proper timestamp type
See: Mapping Processor | Bloblang Guide
Output: Stdout
output:
stdout:
codec: lines
Outputs each processed message to stdout in JSON format. Useful for testing and debugging.
See: Stdout Output Documentation
Example Input/Output
Input Log Line:
{"timestamp":"2024-01-15T10:30:00Z","level":"INFO","message":"User login successful","user_id":"12345"}
Output:
{
"timestamp": "2024-01-15T10:30:00Z",
"level": "INFO",
"message": "User login successful",
"user_id": "12345"
}
Common Variations
Add Filtering
Filter out debug-level logs:
pipeline:
processors:
- mapping: |
root = this.parse_json()
# Drop debug logs
root = if this.level == "DEBUG" { deleted() }
Output to File
Change the output to write to a file instead of stdout:
output:
file:
path: /var/log/processed-${! timestamp_unix() }.json
codec: lines
Add Timestamps
Add a processing timestamp:
pipeline:
processors:
- mapping: |
root = this.parse_json()
root.processed_at = now()
Next Steps
- File Input - Read from files and streams
- Mapping Processor - Transform data with Bloblang
- Bloblang Guide - Data transformation reference
- Log Processing Example - Advanced log processing patterns