HTTP to File Pipeline
This example demonstrates receiving data via HTTP POST requests, applying transformations, batching events, and writing them to files. It's useful for receiving webhook events, logs, or metrics from external systems.
Download & Run
Quick Start:
# Download and run directly
curl -sSL https://docs.expanso.io/examples/http-to-file.yaml | expanso-edge run -
# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/http-to-file.yaml
expanso-edge run -f my-pipeline.yaml
Download: http-to-file.yaml
What This Pipeline Does
- Receives HTTP POST requests at
/eventsendpoint - Validates and parses JSON payloads
- Enriches with metadata (timestamp, node ID, processing ID)
- Batches events into JSON arrays
- Writes batches to timestamped files
Complete Pipeline
input:
http_server:
address: 0.0.0.0:8082
path: /events
allowed_verbs:
- POST
sync_response:
status: "202"
headers:
Content-Type: application/json
pipeline:
processors:
# Validate and parse JSON
- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }
# Add metadata
- mapping: |
root.received_at = now()
root.node_id = env("NODE_ID").or("unknown")
root.processing_id = uuid_v4()
# Group into batches
- archive:
format: json_array
# Add batch metadata
- mapping: |
root.batch_id = uuid_v4()
root.batch_size = this.length()
root.events = this
output:
file:
path: /var/log/expanso/events/${!timestamp_unix()}.json
Configuration Breakdown
Input: HTTP Server
input:
http_server:
address: 0.0.0.0:8082
path: /events
allowed_verbs:
- POST
sync_response:
status: "202"
headers:
Content-Type: application/json
The http_server input listens on port 8082 and accepts POST requests at /events. It responds immediately with HTTP 202 (Accepted) to acknowledge receipt without waiting for processing to complete.
Note: The sync_response.status must be a string, not a number.
See: HTTP Server Input
Processors: Validate, Enrich, Batch
1. Parse and Validate JSON
- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }
Attempts to parse the incoming payload as JSON. If parsing fails, the message is deleted (dropped). This prevents invalid data from proceeding through the pipeline.
2. Add Metadata
- mapping: |
root.received_at = now()
root.node_id = env("NODE_ID").or("unknown")
root.processing_id = uuid_v4()
Enriches each event with:
received_at: Current timestampnode_id: From NODE_ID environment variable, defaults to "unknown"processing_id: Unique UUID for tracking
3. Batching with Archive
- archive:
format: json_array
Groups individual events into JSON arrays. This creates batches rather than writing one file per event.
See: Archive Processor
4. Batch Metadata
- mapping: |
root.batch_id = uuid_v4()
root.batch_size = this.length()
root.events = this
Wraps the batched events array with metadata:
batch_id: Unique identifier for this batchbatch_size: Number of events in the batchevents: The array of events
Output: File
output:
file:
path: /var/log/expanso/events/${!timestamp_unix()}.json
Writes each batch to a file with a Unix timestamp in the filename. The ${!timestamp_unix()} function interpolation generates unique filenames.
Example filename: /var/log/expanso/events/1706191234.json
See: File Output
Example Usage
Send a Test Event
curl -X POST http://localhost:8082/events \
-H "Content-Type: application/json" \
-d '{"event":"user.signup","user_id":"12345","timestamp":"2024-01-15T10:30:00Z"}'
Expected Response
HTTP/1.1 202 Accepted
Content-Type: application/json
Resulting File Content
After batching, the output file might look like:
{
"batch_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"batch_size": 1,
"events": [
{
"event": "user.signup",
"user_id": "12345",
"timestamp": "2024-01-15T10:30:00Z",
"received_at": "2024-01-15T10:30:01.234Z",
"node_id": "edge-001",
"processing_id": "b2c3d4e5-f6g7-8901-bcde-f12345678901"
}
]
}
Common Variations
Filter Specific Events
Only process certain event types:
pipeline:
processors:
- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }
# Filter to specific event types
- mapping: |
root = if ["user.signup", "user.login"].contains(this.event) {
this
} else {
deleted()
}
Route to Multiple Outputs
Send different event types to different files:
output:
switch:
cases:
- check: this.events.0.event.has_prefix("user.")
output:
file:
path: /var/log/expanso/user-events/${!timestamp_unix()}.json
- check: this.events.0.event.has_prefix("system.")
output:
file:
path: /var/log/expanso/system-events/${!timestamp_unix()}.json
- output:
file:
path: /var/log/expanso/other-events/${!timestamp_unix()}.json
Adjust Batch Size
Control how many events are batched together:
pipeline:
processors:
# ... other processors ...
# Batch events
- archive:
format: json_array
- mapping: |
root.batch_id = uuid_v4()
root.batch_size = this.length()
root.events = this
output:
file:
path: /var/log/expanso/events/${!timestamp_unix()}.json
batching:
count: 100
period: 30s
Next Steps
- HTTP Server Input - Full configuration options
- Archive Processor - Batching patterns
- Bloblang Guide - Data transformation reference