Skip to main content

Build Your First Pipeline

Learn how to build and test a data pipeline on your local machine - no cloud account needed. You'll have a working pipeline processing data in under 5 minutes.

What You'll Build

A simple pipeline that:

  1. Generates test data (sensor readings)
  2. Transforms and filters the data
  3. Outputs results to your terminal

This gives you a fast feedback loop to learn pipeline concepts and test your configurations before deploying anywhere.

Want API-Based Development?

This guide runs pipelines directly from config files. If you prefer submitting jobs via CLI/API (like in production), check out Local Mode - it runs Edge as an API server without needing an orchestrator.

Prerequisites

  • Linux, macOS, or Windows
  • Terminal access
  • 5 minutes

Install Expanso Edge

Expanso Edge is the runtime that executes your pipelines. Install it locally:

# Install Expanso Edge
curl -fsSL https://get.expanso.io/edge/install.sh | bash

# Verify installation
expanso-edge version

Create Your First Pipeline

Create a file called my-pipeline.yaml:

input:
generate:
interval: 1s
mapping: |
root.sensor_id = "sensor-" + random_int(max:5).string()
root.temperature = random_int(min:15, max:35)
root.humidity = random_int(min:30, max:80)
root.timestamp = now()

pipeline:
processors:
- mapping: |
# Add a status field based on temperature
root.status = if this.temperature > 28 {
"warning"
} else {
"normal"
}

- mapping: |
# Only keep warning messages
root = if this.status != "warning" { deleted() }

output:
stdout:
codec: lines
Pipeline Config vs Job Format

This is a pipeline configuration file that you run directly with expanso-edge run --config. It does NOT need the job wrapper (name, type, config).

If you're using Local Mode with the API/CLI, you need the job wrapper format. See the Local Mode guide for the difference.

What This Pipeline Does

Input (generate): Creates fake sensor data every second with random temperatures and humidity values.

Processor 1 (mapping): Adds a status field - "warning" if temperature is above 28°C, "normal" otherwise.

Processor 2 (mapping): Filters out everything except warnings.

Output (stdout): Prints the warnings to your terminal.


Run Your Pipeline

Run the pipeline locally:

expanso-edge run --config my-pipeline.yaml

You'll see output like this:

{"humidity":67,"sensor_id":"sensor-3","status":"warning","temperature":31,"timestamp":"2024-11-01T10:23:45Z"}
{"humidity":45,"sensor_id":"sensor-1","status":"warning","temperature":29,"timestamp":"2024-11-01T10:23:46Z"}
{"humidity":52,"sensor_id":"sensor-4","status":"warning","temperature":32,"timestamp":"2024-11-01T10:23:48Z"}

🎉 Your pipeline is running! Press Ctrl+C to stop it.


Try It Yourself

Example 1: Process a Local File

Create a test file with some data:

cat > data.json << 'EOF'
{"event":"login","user":"alice","timestamp":"2024-11-01T10:00:00Z"}
{"event":"purchase","user":"bob","amount":49.99,"timestamp":"2024-11-01T10:05:00Z"}
{"event":"logout","user":"alice","timestamp":"2024-11-01T10:30:00Z"}
{"event":"purchase","user":"charlie","amount":149.99,"timestamp":"2024-11-01T10:35:00Z"}
EOF

Create file-pipeline.yaml:

input:
file:
paths: [./data.json]
codec: lines

pipeline:
processors:
- mapping: |
root = this.parse_json()

# Only keep purchase events
root = if this.event != "purchase" { deleted() }

# Add a category based on amount
root.category = if this.amount >= 100 {
"high-value"
} else {
"standard"
}

output:
stdout:
codec: lines

Run it:

expanso-edge run --config file-pipeline.yaml

Output:

{"amount":49.99,"category":"standard","event":"purchase","timestamp":"2024-11-01T10:05:00Z","user":"bob"}
{"amount":149.99,"category":"high-value","event":"purchase","timestamp":"2024-11-01T10:35:00Z","user":"charlie"}

Example 2: Transform CSV to JSON

Create users.csv:

name,email,age
Alice,[email protected],28
Bob,[email protected],35
Charlie,[email protected],42

Create csv-pipeline.yaml:

input:
file:
paths: [./users.csv]
codec: lines

pipeline:
processors:
- mapping: |
# Parse CSV (skip header line)
root = if @message_number == 0 {
deleted()
} else {
let parts = this.split(",")
{
"name": $parts.index(0),
"email": $parts.index(1),
"age": $parts.index(2).number()
}
}

output:
stdout:
codec: lines

Run it:

expanso-edge run --config csv-pipeline.yaml

What You've Learned

  • ✅ Install Expanso Edge locally
  • ✅ Create pipeline configurations in YAML
  • ✅ Use generate input for testing
  • ✅ Process files from your local filesystem
  • ✅ Transform data with the mapping processor
  • ✅ Filter and route data
  • ✅ Run pipelines locally with expanso-edge run

Next Steps

Now that you've built and tested pipelines locally, you can:

Common Next Steps

  1. Add Real Data Sources: Replace generate with real inputs like HTTP, Kafka, or databases
  2. Complex Transformations: Learn more Bloblang patterns for parsing, enrichment, and aggregation
  3. Multiple Outputs: Send data to files, S3, databases, or monitoring systems
  4. Error Handling: Add retry logic and dead letter queues
  5. Production Deployment: Move from local testing to managed deployment with Expanso Cloud

Tips

Fast Iteration: Keep your pipeline running and edit the YAML file. Stop (Ctrl+C) and restart to see changes.

Start Simple: Test each part of your pipeline separately before combining them.

Use stdout for Debugging: When developing, output to stdout to see exactly what's happening at each step.

Limit Test Data: Use count in the generate input to produce a fixed number of messages:

input:
generate:
count: 10 # Generate exactly 10 messages then stop
interval: "" # As fast as possible
mapping: |
root = {"test": "data"}