Skip to main content

HTTP to File Pipeline

This example demonstrates receiving data via HTTP POST requests, applying transformations, batching events, and writing them to files. It's useful for receiving webhook events, logs, or metrics from external systems.

Download & Run

Quick Start:

# Download and run directly
curl -sSL https://docs.expanso.io/examples/http-to-file.yaml | expanso-edge run -

# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/http-to-file.yaml
expanso-edge run -f my-pipeline.yaml

Download: http-to-file.yaml

What This Pipeline Does

  1. Receives HTTP POST requests at /events endpoint
  2. Validates and parses JSON payloads
  3. Enriches with metadata (timestamp, node ID, processing ID)
  4. Batches events into JSON arrays
  5. Writes batches to timestamped files

Complete Pipeline

input:
http_server:
address: 0.0.0.0:8082
path: /events
allowed_verbs:
- POST
sync_response:
status: "202"
headers:
Content-Type: application/json

pipeline:
processors:
# Validate and parse JSON
- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }

# Add metadata
- mapping: |
root.received_at = now()
root.node_id = env("NODE_ID").or("unknown")
root.processing_id = uuid_v4()

# Group into batches
- archive:
format: json_array

# Add batch metadata
- mapping: |
root.batch_id = uuid_v4()
root.batch_size = this.length()
root.events = this

output:
file:
path: /var/log/expanso/events/${!timestamp_unix()}.json

Configuration Breakdown

Input: HTTP Server

input:
http_server:
address: 0.0.0.0:8082
path: /events
allowed_verbs:
- POST
sync_response:
status: "202"
headers:
Content-Type: application/json

The http_server input listens on port 8082 and accepts POST requests at /events. It responds immediately with HTTP 202 (Accepted) to acknowledge receipt without waiting for processing to complete.

Note: The sync_response.status must be a string, not a number.

See: HTTP Server Input

Processors: Validate, Enrich, Batch

1. Parse and Validate JSON

- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }

Attempts to parse the incoming payload as JSON. If parsing fails, the message is deleted (dropped). This prevents invalid data from proceeding through the pipeline.

2. Add Metadata

- mapping: |
root.received_at = now()
root.node_id = env("NODE_ID").or("unknown")
root.processing_id = uuid_v4()

Enriches each event with:

  • received_at: Current timestamp
  • node_id: From NODE_ID environment variable, defaults to "unknown"
  • processing_id: Unique UUID for tracking

3. Batching with Archive

- archive:
format: json_array

Groups individual events into JSON arrays. This creates batches rather than writing one file per event.

See: Archive Processor

4. Batch Metadata

- mapping: |
root.batch_id = uuid_v4()
root.batch_size = this.length()
root.events = this

Wraps the batched events array with metadata:

  • batch_id: Unique identifier for this batch
  • batch_size: Number of events in the batch
  • events: The array of events

Output: File

output:
file:
path: /var/log/expanso/events/${!timestamp_unix()}.json

Writes each batch to a file with a Unix timestamp in the filename. The ${!timestamp_unix()} function interpolation generates unique filenames.

Example filename: /var/log/expanso/events/1706191234.json

See: File Output

Example Usage

Send a Test Event

curl -X POST http://localhost:8082/events \
-H "Content-Type: application/json" \
-d '{"event":"user.signup","user_id":"12345","timestamp":"2024-01-15T10:30:00Z"}'

Expected Response

HTTP/1.1 202 Accepted
Content-Type: application/json

Resulting File Content

After batching, the output file might look like:

{
"batch_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"batch_size": 1,
"events": [
{
"event": "user.signup",
"user_id": "12345",
"timestamp": "2024-01-15T10:30:00Z",
"received_at": "2024-01-15T10:30:01.234Z",
"node_id": "edge-001",
"processing_id": "b2c3d4e5-f6g7-8901-bcde-f12345678901"
}
]
}

Common Variations

Filter Specific Events

Only process certain event types:

pipeline:
processors:
- mapping: |
let parsed = this.parse_json()
root = if parsed.exists() { parsed } else { deleted() }

# Filter to specific event types
- mapping: |
root = if ["user.signup", "user.login"].contains(this.event) {
this
} else {
deleted()
}

Route to Multiple Outputs

Send different event types to different files:

output:
switch:
cases:
- check: this.events.0.event.has_prefix("user.")
output:
file:
path: /var/log/expanso/user-events/${!timestamp_unix()}.json

- check: this.events.0.event.has_prefix("system.")
output:
file:
path: /var/log/expanso/system-events/${!timestamp_unix()}.json

- output:
file:
path: /var/log/expanso/other-events/${!timestamp_unix()}.json

Adjust Batch Size

Control how many events are batched together:

pipeline:
processors:
# ... other processors ...

# Batch events
- archive:
format: json_array

- mapping: |
root.batch_id = uuid_v4()
root.batch_size = this.length()
root.events = this

output:
file:
path: /var/log/expanso/events/${!timestamp_unix()}.json
batching:
count: 100
period: 30s

Next Steps