Tailing Live Logs with Subprocess
The subprocess processor allows you to execute external commands and pipe data through them. One powerful use case is tailing live log files, enabling you to stream log data directly into your Expanso Edge pipeline for real-time processing, filtering, and routing.
Why Use Subprocess for Tailing?
- Process log files in real-time without deploying dedicated log collectors
- Combine with Expanso processors to filter, enrich, parse, and route logs before they leave the edge
- Reduce bandwidth by processing and filtering at the source
- No external agents needed — just the
tailcommand built into Linux/Unix systems
Quick Start
Stream a live log file through a subprocess processor:
input:
generate:
count: 0 # Run indefinitely
pipeline:
processors:
- subprocess:
name: tail
args:
- "-F"
- "${DATA_DIR:/app/data/sensor.log}"
- parse_log:
format: json
output:
resource: my_output
What's happening:
generatewithcount: 0creates a single message to start the pipelinesubprocesswithtail -Fruns the tail command, which outputs new log lines as they're written- Each line from tail becomes a new message in the pipeline
- Subsequent processors (like
parse_log) can transform the data - The output resource receives the processed logs
Configuration Details
Basic Tail (No Logs on Startup)
Tail a file starting from the end (skip existing logs):
subprocess:
name: tail
args:
- "-f"
- "/var/log/app.log"
Follow File Descriptor (Handles Log Rotation)
Use -F to follow the file descriptor, automatically picking up rotated logs:
subprocess:
name: tail
args:
- "-F"
- "/var/log/app.log"
This is essential for production log files that get rotated. The -f flag follows filenames (which breaks on rotation), while -F follows file descriptors (continues after rotation).
Start From Beginning
Tail all existing lines plus new ones:
subprocess:
name: tail
args:
- "-F"
- "+1" # Start from line 1
- "/var/log/app.log"
Start From Specific Line Count
Tail the last 100 existing lines plus new ones:
subprocess:
name: tail
args:
- "-F"
- "-n"
- "100"
- "/var/log/app.log"
Dynamic Path with Environment Variables
Use ${ENV_VAR:default} syntax to reference environment variables:
subprocess:
name: tail
args:
- "-F"
- "${LOG_PATH:/var/log/app.log}"
Then set the environment variable when running Expanso Edge:
export LOG_PATH=/custom/path/logs/app.log
expanso start
Or in your configuration:
environment:
LOG_PATH: /custom/path/logs/app.log
Complete Examples
Example 1: Parse and Route JSON Logs
Stream JSON logs, parse them, and route by severity:
input:
generate:
count: 0
pipeline:
processors:
- subprocess:
name: tail
args:
- "-F"
- "/var/log/app.json"
- parse_log:
format: json
- switch:
- check: root.severity == "ERROR"
processors:
- mapping: root.routed_to = "error_channel"
- check: root.severity == "WARN"
processors:
- mapping: root.routed_to = "warning_channel"
- processors:
- mapping: root.routed_to = "info_channel"
output:
broker:
pattern: round_robin
outputs:
- label: errors
filter:
check: root.routed_to == "error_channel"
s3:
bucket: my-bucket
key: errors/${timestamp_unix}.json
- label: warnings
filter:
check: root.routed_to == "warning_channel"
s3:
bucket: my-bucket
key: warnings/${timestamp_unix}.json
- label: info
filter:
check: root.routed_to == "info_channel"
s3:
bucket: my-bucket
key: info/${timestamp_unix}.json
Example 2: Extract Fields and Batch Upload
Tail logs, parse structured data, enrich, and batch for efficient upload:
input:
generate:
count: 0
pipeline:
threads: 2
processors:
- subprocess:
name: tail
args:
- "-F"
- "/opt/data/sensor.log"
- parse_log:
format: json
- mapping: |
root.timestamp = now("2006-01-02T15:04:05Z07:00")
root.node_id = env("NODE_ID").or("unknown")
root.region = env("REGION").or("us-east-1")
output:
batch:
condition:
type: count
count: 100
output:
s3:
bucket: my-bucket
key: logs/year=${year}/month=${month}/day=${day}/${timestamp_unix}.jsonl
codec: jsonlines
Example 3: Real-time Alert on Error Patterns
Tail logs and trigger alerts on specific error patterns:
input:
generate:
count: 0
pipeline:
processors:
- subprocess:
name: tail
args:
- "-F"
- "/var/log/application.log"
- regex:
pattern: 'ERROR.*OutOfMemory|ERROR.*StackOverflow'
- mapping: |
root.alert = true
root.severity = "critical"
root.timestamp = now()
root.host = hostname()
output:
broker:
pattern: round_robin
outputs:
- label: critical_errors
filter:
check: root.alert == true
http_client:
url: "https://alerts.example.com/webhook"
verb: POST
- label: file_backup
file:
path: /var/log/expanso-critical-errors.jsonl
Common Tail Flags Reference
| Flag | Description | Example |
|---|---|---|
-f | Follow file by name (breaks on rotation) | tail -f app.log |
-F | Follow file descriptor (survives rotation) | tail -F app.log |
-n N | Show last N lines | tail -n 50 app.log |
-n +N | Show from line N onward | tail -n +1 app.log |
--pid=PID | Terminate when PID dies | tail -F --pid=$$ app.log |
-q | Suppress headers | tail -q -F app.log |
--retry | Retry if file is inaccessible | tail -F --retry app.log |
Performance Considerations
1. Output Rate vs Processing Speed
If logs arrive faster than your pipeline can process them, consider:
pipeline:
threads: 4 # Use multiple threads to increase throughput
processors:
- subprocess:
name: tail
args: ["-F", "/var/log/app.log"]
2. Buffer Size
For very large log lines, increase the subprocess buffer:
subprocess:
name: tail
args:
- "-F"
- "/var/log/app.log"
max_buffer: 262144 # Increase from default 65536 (64KB) to 256KB
3. Batching for Efficiency
Batch logs before writing to external systems:
output:
batch:
condition:
type: interval
interval: 5s
output:
http_client:
url: "https://ingest.example.com/logs"
Production Readiness
Subprocess Restart Behavior
The subprocess processor is designed to keep long-running processes alive:
- If tail exits early, Expanso Edge automatically restarts it
- Restart is transparent — the pipeline continues processing new logs
- No data loss — thanks to
-Fflag that persists tail state across restarts - Exponential backoff — prevents rapid restart loops if there's a persistent issue
Important: Ensure tail never crashes by using appropriate flags:
subprocess:
name: tail
args:
- "-F" # Follow file descriptor (survives rotation)
- "--retry" # Retry if file temporarily unavailable
- "/var/log/app.log"
Error Handling & Recovery Patterns
Pattern 1: Fall Back to Batch Processing
If tail fails (file deleted, permissions lost), fall back to reading existing logs:
pipeline:
processors:
- try:
- subprocess:
name: tail
args:
- "-F"
- "${LOG_FILE:/var/log/app.log}"
- mapping: root.source = "tail"
catch:
- file:
glob: "${LOG_FILE:/var/log/app.log}"
- mapping: |
root.source = "batch_fallback"
root.recovered = true
This ensures that if tail fails to start or crashes repeatedly, the pipeline switches to reading the file in batches instead of going silent.
Pattern 2: Health Check with Heartbeat
Add a heartbeat log to monitor tail health:
input:
generate:
count: 0
pipeline:
processors:
- subprocess:
name: bash
args:
- "-c"
- "tail -F /var/log/app.log & echo '{\"heartbeat\":true,\"timestamp\":\"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'\"}' >> /var/log/expanso-heartbeat.log; wait"
- mapping: |
root.collected_at = now()
output:
broker:
outputs:
- label: live_logs
filter:
check: root.heartbeat != true
# Your log output here
- label: health
filter:
check: root.heartbeat == true
http_client:
url: "https://monitoring.example.com/health"
verb: POST
Pattern 3: Wrap Tail with Health Wrapper Script
For maximum reliability, wrap tail in a script that handles cleanup:
/usr/local/bin/tail-wrapper.sh:
#!/bin/bash
set -e
LOG_FILE="${1:?LOG_FILE required}"
HEALTH_FILE="${2:-/tmp/tail-health.txt}"
# Signal handler for clean shutdown
cleanup() {
rm -f "$HEALTH_FILE"
exit 0
}
trap cleanup EXIT INT TERM
# Monitor the file and track health
tail -F "$LOG_FILE" &
TAIL_PID=$!
# Write health marker every 10 seconds
while kill -0 $TAIL_PID 2>/dev/null; do
echo "$(date -u +%s)" > "$HEALTH_FILE"
sleep 10
done
wait $TAIL_PID
Then use it in your pipeline:
subprocess:
name: /usr/local/bin/tail-wrapper.sh
args:
- "/var/log/app.log"
- "/tmp/tail-${NODE_ID}.health"
Preventing Data Loss
1. Atomic Output Writes
Always use transactional or batch outputs:
output:
batch:
condition:
type: count
count: 100 # Don't send partial batches
output:
s3:
bucket: logs
key: batch-${timestamp_unix}.jsonl
2. Retry on Failure
Add retry logic to your outputs:
output:
retry:
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 30s
multiplier: 2
output:
http_client:
url: "https://ingest.example.com"
timeout: 10s
3. Local Backup
Always maintain a local backup alongside cloud/remote storage:
output:
broker:
pattern: round_robin
outputs:
- label: primary
http_client:
url: "https://ingest.example.com"
retry:
max_retries: 3
- label: backup
file:
path: /var/log/expanso-backup-${timestamp_unix}.jsonl
Resource Limits
Prevent tail from consuming excessive memory or CPU:
pipeline:
threads: 1 # Limit to 1 thread for subprocess (tail is single-threaded anyway)
processors:
- subprocess:
name: tail
args:
- "-F"
- "/var/log/app.log"
max_buffer: 65536 # Default is good; increase only if needed
Then enforce OS-level limits:
# Limit Expanso Edge process to 512MB RAM, 50% CPU
systemctl set-property expanso.service MemoryLimit=512M
systemctl set-property expanso.service CPUQuota=50%