Skip to main content

Basic Log Processing Pipeline

This example demonstrates a simple log processing pipeline that reads from a file, parses JSON logs, and outputs to stdout. It's a great starting point for understanding how Expanso pipelines work.

Download & Run

Quick Start:

# Download and run directly
curl -sSL https://docs.expanso.io/examples/basic-pipeline.yaml | expanso-edge run -

# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/basic-pipeline.yaml
expanso-edge run -f my-pipeline.yaml

Download: basic-pipeline.yaml

What This Pipeline Does

  1. Reads log lines from a file
  2. Parses each line as JSON
  3. Extracts the timestamp field
  4. Outputs the processed logs to stdout

Complete Pipeline

input:
file:
paths: [/var/log/app.log]
codec: lines

pipeline:
processors:
- mapping: |
# Parse the log line as JSON
root = this.parse_json()

- mapping: |
# Parse timestamp field if it exists
root.timestamp = this.timestamp.parse_timestamp("2006-01-02T15:04:05Z")

output:
stdout:
codec: lines

Configuration Breakdown

Input: File

input:
file:
paths: [/var/log/app.log]
codec: lines

The file input reads from /var/log/app.log, processing each line individually. The lines codec tells Expanso to treat each line as a separate message.

See: File Input Documentation

Processors: Parse & Transform

pipeline:
processors:
- mapping: |
# Parse the log line as JSON
root = this.parse_json()

- mapping: |
# Parse timestamp field if it exists
root.timestamp = this.timestamp.parse_timestamp("2006-01-02T15:04:05Z")

The pipeline uses two mapping processors (which use the Bloblang language):

  1. JSON Parsing: Converts each log line from a string to a JSON object
  2. Timestamp Parsing: Converts the timestamp string to a proper timestamp type

See: Mapping Processor | Bloblang Guide

Output: Stdout

output:
stdout:
codec: lines

Outputs each processed message to stdout in JSON format. Useful for testing and debugging.

See: Stdout Output Documentation

Example Input/Output

Input Log Line:

{"timestamp":"2024-01-15T10:30:00Z","level":"INFO","message":"User login successful","user_id":"12345"}

Output:

{
"timestamp": "2024-01-15T10:30:00Z",
"level": "INFO",
"message": "User login successful",
"user_id": "12345"
}

Common Variations

Add Filtering

Filter out debug-level logs:

pipeline:
processors:
- mapping: |
root = this.parse_json()

# Drop debug logs
root = if this.level == "DEBUG" { deleted() }

Output to File

Change the output to write to a file instead of stdout:

output:
file:
path: /var/log/processed-${! timestamp_unix() }.json
codec: lines

Add Timestamps

Add a processing timestamp:

pipeline:
processors:
- mapping: |
root = this.parse_json()
root.processed_at = now()

Next Steps