Tailing Live Logs with Subprocess
The subprocess processor allows you to execute external commands and pipe data through them. One powerful use case is tailing live log files, enabling you to stream log data directly into your Expanso Edge pipeline for real-time processing, filtering, and routing.
Why Use Subprocess for Tailing?
- Process log files in real-time without deploying dedicated log collectors
- Combine with Expanso processors to filter, enrich, parse, and route logs before they leave the edge
- Reduce bandwidth by processing and filtering at the source
- No external agents needed — just the
tailcommand built into Linux/Unix systems
Quick Start
Stream a live log file through a subprocess processor:
input:
generate:
count: 0 # Run indefinitely
pipeline:
processors:
- subprocess:
name: tail
args:
- "-F"
- "${DATA_DIR:/app/data/sensor.log}"
- parse_log:
format: json
output:
resource: my_output
What's happening:
generatewithcount: 0creates a single message to start the pipelinesubprocesswithtail -Fruns the tail command, which outputs new log lines as they're written- Each line from tail becomes a new message in the pipeline
- Subsequent processors (like
parse_log) can transform the data - The output resource receives the processed logs
Configuration Details
Basic Tail (No Logs on Startup)
Tail a file starting from the end (skip existing logs):
subprocess:
name: tail
args:
- "-f"
- "/var/log/app.log"
Follow File Descriptor (Handles Log Rotation)
Use -F to follow the file descriptor, automatically picking up rotated logs:
subprocess:
name: tail
args:
- "-F"
- "/var/log/app.log"
This is essential for production log files that get rotated. The -f flag follows filenames (which breaks on rotation), while -F follows file descriptors (continues after rotation).
Start From Beginning
Tail all existing lines plus new ones:
subprocess:
name: tail
args:
- "-F"
- "+1" # Start from line 1
- "/var/log/app.log"
Start From Specific Line Count
Tail the last 100 existing lines plus new ones:
subprocess:
name: tail
args:
- "-F"
- "-n"
- "100"
- "/var/log/app.log"
Dynamic Path with Environment Variables
Use ${ENV_VAR:default} syntax to reference environment variables:
subprocess:
name: tail
args:
- "-F"
- "${LOG_PATH:/var/log/app.log}"
Then set the environment variable when running Expanso Edge:
export LOG_PATH=/custom/path/logs/app.log
expanso start
Or in your configuration:
environment:
LOG_PATH: /custom/path/logs/app.log
Complete Examples
Example 1: Parse and Route JSON Logs
Stream JSON logs, parse them, and route by severity:
input:
generate:
count: 0
pipeline:
processors:
- subprocess:
name: tail
args:
- "-F"
- "/var/log/app.json"
- parse_log:
format: json
- switch:
- check: root.severity == "ERROR"
processors:
- mapping: root.routed_to = "error_channel"
- check: root.severity == "WARN"
processors:
- mapping: root.routed_to = "warning_channel"
- processors:
- mapping: root.routed_to = "info_channel"
output:
broker:
pattern: round_robin
outputs:
- label: errors
filter:
check: root.routed_to == "error_channel"
s3:
bucket: my-bucket
key: errors/${timestamp_unix}.json
- label: warnings
filter:
check: root.routed_to == "warning_channel"
s3:
bucket: my-bucket
key: warnings/${timestamp_unix}.json
- label: info
filter:
check: root.routed_to == "info_channel"
s3:
bucket: my-bucket
key: info/${timestamp_unix}.json
Example 2: Extract Fields and Batch Upload
Tail logs, parse structured data, enrich, and batch for efficient upload:
input:
generate:
count: 0
pipeline:
threads: 2
processors:
- subprocess:
name: tail
args:
- "-F"
- "/opt/data/sensor.log"
- parse_log:
format: json
- mapping: |
root.timestamp = now("2006-01-02T15:04:05Z07:00")
root.node_id = env("NODE_ID").or("unknown")
root.region = env("REGION").or("us-east-1")
output:
batch:
condition:
type: count
count: 100
output:
s3:
bucket: my-bucket
key: logs/year=${year}/month=${month}/day=${day}/${timestamp_unix}.jsonl
codec: jsonlines
Example 3: Real-time Alert on Error Patterns
Tail logs and trigger alerts on specific error patterns:
input:
generate:
count: 0
pipeline:
processors:
- subprocess:
name: tail
args:
- "-F"
- "/var/log/application.log"
- regex:
pattern: 'ERROR.*OutOfMemory|ERROR.*StackOverflow'
- mapping: |
root.alert = true
root.severity = "critical"
root.timestamp = now()
root.host = hostname()
output:
broker:
pattern: round_robin
outputs:
- label: critical_errors
filter:
check: root.alert == true
http_client:
url: "https://alerts.example.com/webhook"
verb: POST
- label: file_backup
file:
path: /var/log/expanso-critical-errors.jsonl
Common Tail Flags Reference
| Flag | Description | Example |
|---|---|---|
-f | Follow file by name (breaks on rotation) | tail -f app.log |
-F | Follow file descriptor (survives rotation) | tail -F app.log |
-n N | Show last N lines | tail -n 50 app.log |
-n +N | Show from line N onward | tail -n +1 app.log |
--pid=PID | Terminate when PID dies | tail -F --pid=$$ app.log |
-q | Suppress headers | tail -q -F app.log |
--retry | Retry if file is inaccessible | tail -F --retry app.log |
Performance Considerations
1. Output Rate vs Processing Speed
If logs arrive faster than your pipeline can process them, consider:
pipeline:
threads: 4 # Use multiple threads to increase throughput
processors:
- subprocess:
name: tail
args: ["-F", "/var/log/app.log"]
2. Buffer Size
For very large log lines, increase the subprocess buffer:
subprocess:
name: tail
args:
- "-F"
- "/var/log/app.log"
max_buffer: 262144 # Increase from default 65536 (64KB) to 256KB
3. Batching for Efficiency
Batch logs before writing to external systems:
output:
batch:
condition:
type: interval
interval: 5s
output:
http_client:
url: "https://ingest.example.com/logs"
Production Readiness
Subprocess Restart Behavior
The subprocess processor is designed to keep long-running processes alive:
- If tail exits early, Expanso Edge automatically restarts it
- Restart is transparent — the pipeline continues processing new logs
- No data loss — thanks to
-Fflag that persists tail state across restarts - Exponential backoff — prevents rapid restart loops if there's a persistent issue
Important: Ensure tail never crashes by using appropriate flags:
subprocess:
name: tail
args:
- "-F" # Follow file descriptor (survives rotation)
- "--retry" # Retry if file temporarily unavailable
- "/var/log/app.log"
Error Handling & Recovery Patterns
Pattern 1: Fall Back to Batch Processing
If tail fails (file deleted, permissions lost), fall back to reading existing logs:
pipeline:
processors:
- try:
- subprocess:
name: tail
args:
- "-F"
- "${LOG_FILE:/var/log/app.log}"
- mapping: root.source = "tail"
catch:
- file:
glob: "${LOG_FILE:/var/log/app.log}"
- mapping: |
root.source = "batch_fallback"
root.recovered = true
This ensures that if tail fails to start or crashes repeatedly, the pipeline switches to reading the file in batches instead of going silent.
Pattern 2: Health Check with Heartbeat
Add a heartbeat log to monitor tail health:
input:
generate:
count: 0
pipeline:
processors:
- subprocess:
name: bash
args:
- "-c"
- "tail -F /var/log/app.log & echo '{\"heartbeat\":true,\"timestamp\":\"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'\"}' >> /var/log/expanso-heartbeat.log; wait"
- mapping: |
root.collected_at = now()
output:
broker:
outputs:
- label: live_logs
filter:
check: root.heartbeat != true
# Your log output here
- label: health
filter:
check: root.heartbeat == true
http_client:
url: "https://monitoring.example.com/health"
verb: POST
Pattern 3: Wrap Tail with Health Wrapper Script
For maximum reliability, wrap tail in a script that handles cleanup:
/usr/local/bin/tail-wrapper.sh:
#!/bin/bash
set -e
LOG_FILE="${1:?LOG_FILE required}"
HEALTH_FILE="${2:-/tmp/tail-health.txt}"
# Signal handler for clean shutdown
cleanup() {
rm -f "$HEALTH_FILE"
exit 0
}
trap cleanup EXIT INT TERM
# Monitor the file and track health
tail -F "$LOG_FILE" &
TAIL_PID=$!
# Write health marker every 10 seconds
while kill -0 $TAIL_PID 2>/dev/null; do
echo "$(date -u +%s)" > "$HEALTH_FILE"
sleep 10
done
wait $TAIL_PID
Then use it in your pipeline:
subprocess:
name: /usr/local/bin/tail-wrapper.sh
args:
- "/var/log/app.log"
- "/tmp/tail-${NODE_ID}.health"
Preventing Data Loss
1. Atomic Output Writes
Always use transactional or batch outputs:
output:
batch:
condition:
type: count
count: 100 # Don't send partial batches
output:
s3:
bucket: logs
key: batch-${timestamp_unix}.jsonl
2. Retry on Failure
Add retry logic to your outputs:
output:
retry:
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 30s
multiplier: 2
output:
http_client:
url: "https://ingest.example.com"
timeout: 10s
3. Local Backup
Always maintain a local backup alongside cloud/remote storage:
output:
broker:
pattern: round_robin
outputs:
- label: primary
http_client:
url: "https://ingest.example.com"
retry:
max_retries: 3
- label: backup
file:
path: /var/log/expanso-backup-${timestamp_unix}.jsonl
Resource Limits
Prevent tail from consuming excessive memory or CPU:
pipeline:
threads: 1 # Limit to 1 thread for subprocess (tail is single-threaded anyway)
processors:
- subprocess:
name: tail
args:
- "-F"
- "/var/log/app.log"
max_buffer: 65536 # Default is good; increase only if needed
Then enforce OS-level limits:
# Limit Expanso Edge process to 512MB RAM, 50% CPU
systemctl set-property expanso.service MemoryLimit=512M
systemctl set-property expanso.service CPUQuota=50%
Monitoring Tail Health
Check Logs for Restarts
Watch Expanso logs for subprocess restart messages:
journalctl -u expanso.service -f | grep -i "subprocess\|restart\|error"
Verify File Permissions
Regular checks prevent permission-related failures:
# Add to crontab
0 * * * * /usr/local/bin/check-log-perms.sh
/usr/local/bin/check-log-perms.sh:
#!/bin/bash
LOG_FILE="/var/log/app.log"
EXPANSO_USER="expanso"
if [ ! -r "$LOG_FILE" ]; then
echo "ERROR: $EXPANSO_USER cannot read $LOG_FILE" | logger
exit 1
fi
Monitor File Handle Limits
If tail can't open more files:
# Check current limits
ulimit -n
# Set higher limit (in systemd service file)
[Service]
LimitNOFILE=65536
Alert on Tail Failure
Create a secondary pipeline to detect when tail stops working:
input:
file:
glob: /tmp/tail-health.txt
processors:
- mapping: |
age_seconds = (now().unix() - timestamp_unix(this.body, "unix").unix())
root.is_stale = age_seconds > 30
output:
broker:
outputs:
- label: alert_if_stale
filter:
check: root.is_stale == true
http_client:
url: "https://alerts.example.com/critical"
verb: POST
body: 'Tail process appears stuck or crashed'
Troubleshooting
"Permission denied" errors
Ensure the Expanso Edge process has read permissions on the log file:
# Check permissions
ls -la /var/log/app.log
# Run Expanso with appropriate user/group
sudo -u loguser expanso start
Subprocess hangs or no output
Check that:
- The log file path is correct
- The file actually exists and is being written to
- Use
-Finstead of-ffor production (handles rotation) - Consider adding
--retryfor reliability:
subprocess:
name: tail
args:
- "-F"
- "--retry"
- "/var/log/app.log"
High CPU usage
If subprocess is consuming too much CPU:
- Reduce pipeline threads
- Check if the log file is being hammered with writes
- Consider filtering earlier in the pipeline to reduce downstream processing
Tail with Other Commands
The subprocess processor works with any command, not just tail. You can combine commands:
# Use tail + grep to filter in-process
subprocess:
name: bash
args:
- "-c"
- "tail -F /var/log/app.log | grep ERROR"
Next Steps
- Subprocess Processor — Full configuration reference
- Log Processing Examples — Production pipeline patterns
- Parse Log Processor — Parse common log formats
- Mapping Processor — Transform parsed data
- Error Handling — Handle failures gracefully