Build Your First Pipeline
Learn how to build and test a data pipeline on your local machine - no cloud account needed. You'll have a working pipeline processing data in under 5 minutes.
What You'll Build
A simple pipeline that:
- Generates test data (sensor readings)
- Transforms and filters the data
- Outputs results to your terminal
This gives you a fast feedback loop to learn pipeline concepts and test your configurations before deploying anywhere.
This guide runs pipelines directly from config files. If you prefer submitting jobs via CLI/API (like in production), check out Local Mode - it runs Edge as an API server without needing an orchestrator.
Prerequisites
- Linux, macOS, or Windows
- Terminal access
- 5 minutes
Install Expanso Edge
Expanso Edge is the runtime that executes your pipelines. Install it locally:
- Linux
- macOS
- Windows
# Install Expanso Edge
curl -fsSL https://get.expanso.io/edge/install.sh | bash
# Verify installation
expanso-edge version
# Install Expanso Edge
curl -fsSL https://get.expanso.io/edge/install.sh | bash
# Verify installation
expanso-edge version
# Windows: Use Docker (PowerShell installer not yet available)
docker pull ghcr.io/expanso-io/expanso-edge:nightly
# Or use WSL2
wsl curl -fsSL https://get.expanso.io/edge/install.sh | bash
See the Installation Guide Windows tab for detailed setup options including Docker and WSL2.
Create Your First Pipeline
Create a file called my-pipeline.yaml:
input:
generate:
interval: 1s
mapping: |
root.sensor_id = "sensor-" + random_int(max:5).string()
root.temperature = random_int(min:15, max:35)
root.humidity = random_int(min:30, max:80)
root.timestamp = now()
pipeline:
processors:
- mapping: |
# Add a status field based on temperature
root.status = if this.temperature > 28 {
"warning"
} else {
"normal"
}
- mapping: |
# Only keep warning messages
root = if this.status != "warning" { deleted() }
output:
stdout:
codec: lines
This is a pipeline configuration file that you run directly with expanso-edge run --config. It does NOT need the job wrapper (name, type, config).
If you're using Local Mode with the API/CLI, you need the job wrapper format. See the Local Mode guide for the difference.
What This Pipeline Does
Input (generate): Creates fake sensor data every second with random temperatures and humidity values.
Processor 1 (mapping): Adds a status field - "warning" if temperature is above 28°C, "normal" otherwise.
Processor 2 (mapping): Filters out everything except warnings.
Output (stdout): Prints the warnings to your terminal.
Run Your Pipeline
Run the pipeline locally:
expanso-edge run --config my-pipeline.yaml
You'll see output like this:
{"humidity":67,"sensor_id":"sensor-3","status":"warning","temperature":31,"timestamp":"2024-11-01T10:23:45Z"}
{"humidity":45,"sensor_id":"sensor-1","status":"warning","temperature":29,"timestamp":"2024-11-01T10:23:46Z"}
{"humidity":52,"sensor_id":"sensor-4","status":"warning","temperature":32,"timestamp":"2024-11-01T10:23:48Z"}
🎉 Your pipeline is running! Press Ctrl+C to stop it.
Try It Yourself
Example 1: Process a Local File
Create a test file with some data:
cat > data.json << 'EOF'
{"event":"login","user":"alice","timestamp":"2024-11-01T10:00:00Z"}
{"event":"purchase","user":"bob","amount":49.99,"timestamp":"2024-11-01T10:05:00Z"}
{"event":"logout","user":"alice","timestamp":"2024-11-01T10:30:00Z"}
{"event":"purchase","user":"charlie","amount":149.99,"timestamp":"2024-11-01T10:35:00Z"}
EOF
Create file-pipeline.yaml:
input:
file:
paths: [./data.json]
codec: lines
pipeline:
processors:
- mapping: |
root = this.parse_json()
# Only keep purchase events
root = if this.event != "purchase" { deleted() }
# Add a category based on amount
root.category = if this.amount >= 100 {
"high-value"
} else {
"standard"
}
output:
stdout:
codec: lines
Run it:
expanso-edge run --config file-pipeline.yaml
Output:
{"amount":49.99,"category":"standard","event":"purchase","timestamp":"2024-11-01T10:05:00Z","user":"bob"}
{"amount":149.99,"category":"high-value","event":"purchase","timestamp":"2024-11-01T10:35:00Z","user":"charlie"}
Example 2: Transform CSV to JSON
Create users.csv:
name,email,age
Alice,[email protected],28
Bob,[email protected],35
Charlie,[email protected],42
Create csv-pipeline.yaml:
input:
file:
paths: [./users.csv]
codec: lines
pipeline:
processors:
- mapping: |
# Parse CSV (skip header line)
root = if @message_number == 0 {
deleted()
} else {
let parts = this.split(",")
{
"name": $parts.index(0),
"email": $parts.index(1),
"age": $parts.index(2).number()
}
}
output:
stdout:
codec: lines
Run it:
expanso-edge run --config csv-pipeline.yaml
What You've Learned
- ✅ Install Expanso Edge locally
- ✅ Create pipeline configurations in YAML
- ✅ Use
generateinput for testing - ✅ Process files from your local filesystem
- ✅ Transform data with the
mappingprocessor - ✅ Filter and route data
- ✅ Run pipelines locally with
expanso-edge run
Next Steps
Now that you've built and tested pipelines locally, you can:
- Test and Debug - Validate configurations, add logging, troubleshoot issues
- Explore Components - Browse 200+ inputs, processors, and outputs
- Learn Bloblang - Master the transformation language
- Deploy to Production - Set up Expanso Cloud and deploy pipelines to your infrastructure
Common Next Steps
- Add Real Data Sources: Replace
generatewith real inputs like HTTP, Kafka, or databases - Complex Transformations: Learn more Bloblang patterns for parsing, enrichment, and aggregation
- Multiple Outputs: Send data to files, S3, databases, or monitoring systems
- Error Handling: Add retry logic and dead letter queues
- Production Deployment: Move from local testing to managed deployment with Expanso Cloud
Tips
Fast Iteration: Keep your pipeline running and edit the YAML file. Stop (Ctrl+C) and restart to see changes.
Start Simple: Test each part of your pipeline separately before combining them.
Use stdout for Debugging: When developing, output to stdout to see exactly what's happening at each step.
Limit Test Data: Use count in the generate input to produce a fixed number of messages:
input:
generate:
count: 10 # Generate exactly 10 messages then stop
interval: "" # As fast as possible
mapping: |
root = {"test": "data"}