Skip to main content

Tailing Live Logs with Subprocess

The subprocess processor allows you to execute external commands and pipe data through them. One powerful use case is tailing live log files, enabling you to stream log data directly into your Expanso Edge pipeline for real-time processing, filtering, and routing.

Why Use Subprocess for Tailing?

  • Process log files in real-time without deploying dedicated log collectors
  • Combine with Expanso processors to filter, enrich, parse, and route logs before they leave the edge
  • Reduce bandwidth by processing and filtering at the source
  • No external agents needed — just the tail command built into Linux/Unix systems

Quick Start

Stream a live log file through a subprocess processor:

input:
generate:
count: 0 # Run indefinitely

pipeline:
processors:
- subprocess:
name: tail
args:
- "-F"
- "${DATA_DIR:/app/data/sensor.log}"
- parse_log:
format: json

output:
resource: my_output

What's happening:

  1. generate with count: 0 creates a single message to start the pipeline
  2. subprocess with tail -F runs the tail command, which outputs new log lines as they're written
  3. Each line from tail becomes a new message in the pipeline
  4. Subsequent processors (like parse_log) can transform the data
  5. The output resource receives the processed logs

Configuration Details

Basic Tail (No Logs on Startup)

Tail a file starting from the end (skip existing logs):

subprocess:
name: tail
args:
- "-f"
- "/var/log/app.log"

Follow File Descriptor (Handles Log Rotation)

Use -F to follow the file descriptor, automatically picking up rotated logs:

subprocess:
name: tail
args:
- "-F"
- "/var/log/app.log"

This is essential for production log files that get rotated. The -f flag follows filenames (which breaks on rotation), while -F follows file descriptors (continues after rotation).

Start From Beginning

Tail all existing lines plus new ones:

subprocess:
name: tail
args:
- "-F"
- "+1" # Start from line 1
- "/var/log/app.log"

Start From Specific Line Count

Tail the last 100 existing lines plus new ones:

subprocess:
name: tail
args:
- "-F"
- "-n"
- "100"
- "/var/log/app.log"

Dynamic Path with Environment Variables

Use ${ENV_VAR:default} syntax to reference environment variables:

subprocess:
name: tail
args:
- "-F"
- "${LOG_PATH:/var/log/app.log}"

Then set the environment variable when running Expanso Edge:

export LOG_PATH=/custom/path/logs/app.log
expanso start

Or in your configuration:

environment:
LOG_PATH: /custom/path/logs/app.log

Complete Examples

Example 1: Parse and Route JSON Logs

Stream JSON logs, parse them, and route by severity:

input:
generate:
count: 0

pipeline:
processors:
- subprocess:
name: tail
args:
- "-F"
- "/var/log/app.json"
- parse_log:
format: json
- switch:
- check: root.severity == "ERROR"
processors:
- mapping: root.routed_to = "error_channel"
- check: root.severity == "WARN"
processors:
- mapping: root.routed_to = "warning_channel"
- processors:
- mapping: root.routed_to = "info_channel"

output:
broker:
pattern: round_robin
outputs:
- label: errors
filter:
check: root.routed_to == "error_channel"
s3:
bucket: my-bucket
key: errors/${timestamp_unix}.json
- label: warnings
filter:
check: root.routed_to == "warning_channel"
s3:
bucket: my-bucket
key: warnings/${timestamp_unix}.json
- label: info
filter:
check: root.routed_to == "info_channel"
s3:
bucket: my-bucket
key: info/${timestamp_unix}.json

Example 2: Extract Fields and Batch Upload

Tail logs, parse structured data, enrich, and batch for efficient upload:

input:
generate:
count: 0

pipeline:
threads: 2
processors:
- subprocess:
name: tail
args:
- "-F"
- "/opt/data/sensor.log"
- parse_log:
format: json
- mapping: |
root.timestamp = now("2006-01-02T15:04:05Z07:00")
root.node_id = env("NODE_ID").or("unknown")
root.region = env("REGION").or("us-east-1")

output:
batch:
condition:
type: count
count: 100
output:
s3:
bucket: my-bucket
key: logs/year=${year}/month=${month}/day=${day}/${timestamp_unix}.jsonl
codec: jsonlines

Example 3: Real-time Alert on Error Patterns

Tail logs and trigger alerts on specific error patterns:

input:
generate:
count: 0

pipeline:
processors:
- subprocess:
name: tail
args:
- "-F"
- "/var/log/application.log"
- regex:
pattern: 'ERROR.*OutOfMemory|ERROR.*StackOverflow'
- mapping: |
root.alert = true
root.severity = "critical"
root.timestamp = now()
root.host = hostname()

output:
broker:
pattern: round_robin
outputs:
- label: critical_errors
filter:
check: root.alert == true
http_client:
url: "https://alerts.example.com/webhook"
verb: POST
- label: file_backup
file:
path: /var/log/expanso-critical-errors.jsonl

Common Tail Flags Reference

FlagDescriptionExample
-fFollow file by name (breaks on rotation)tail -f app.log
-FFollow file descriptor (survives rotation)tail -F app.log
-n NShow last N linestail -n 50 app.log
-n +NShow from line N onwardtail -n +1 app.log
--pid=PIDTerminate when PID diestail -F --pid=$$ app.log
-qSuppress headerstail -q -F app.log
--retryRetry if file is inaccessibletail -F --retry app.log

Performance Considerations

1. Output Rate vs Processing Speed

If logs arrive faster than your pipeline can process them, consider:

pipeline:
threads: 4 # Use multiple threads to increase throughput
processors:
- subprocess:
name: tail
args: ["-F", "/var/log/app.log"]

2. Buffer Size

For very large log lines, increase the subprocess buffer:

subprocess:
name: tail
args:
- "-F"
- "/var/log/app.log"
max_buffer: 262144 # Increase from default 65536 (64KB) to 256KB

3. Batching for Efficiency

Batch logs before writing to external systems:

output:
batch:
condition:
type: interval
interval: 5s
output:
http_client:
url: "https://ingest.example.com/logs"

Production Readiness

Subprocess Restart Behavior

The subprocess processor is designed to keep long-running processes alive:

  • If tail exits early, Expanso Edge automatically restarts it
  • Restart is transparent — the pipeline continues processing new logs
  • No data loss — thanks to -F flag that persists tail state across restarts
  • Exponential backoff — prevents rapid restart loops if there's a persistent issue

Important: Ensure tail never crashes by using appropriate flags:

subprocess:
name: tail
args:
- "-F" # Follow file descriptor (survives rotation)
- "--retry" # Retry if file temporarily unavailable
- "/var/log/app.log"

Error Handling & Recovery Patterns

Pattern 1: Fall Back to Batch Processing

If tail fails (file deleted, permissions lost), fall back to reading existing logs:

pipeline:
processors:
- try:
- subprocess:
name: tail
args:
- "-F"
- "${LOG_FILE:/var/log/app.log}"
- mapping: root.source = "tail"
catch:
- file:
glob: "${LOG_FILE:/var/log/app.log}"
- mapping: |
root.source = "batch_fallback"
root.recovered = true

This ensures that if tail fails to start or crashes repeatedly, the pipeline switches to reading the file in batches instead of going silent.

Pattern 2: Health Check with Heartbeat

Add a heartbeat log to monitor tail health:

input:
generate:
count: 0

pipeline:
processors:
- subprocess:
name: bash
args:
- "-c"
- "tail -F /var/log/app.log & echo '{\"heartbeat\":true,\"timestamp\":\"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'\"}' >> /var/log/expanso-heartbeat.log; wait"
- mapping: |
root.collected_at = now()

output:
broker:
outputs:
- label: live_logs
filter:
check: root.heartbeat != true
# Your log output here
- label: health
filter:
check: root.heartbeat == true
http_client:
url: "https://monitoring.example.com/health"
verb: POST

Pattern 3: Wrap Tail with Health Wrapper Script

For maximum reliability, wrap tail in a script that handles cleanup:

/usr/local/bin/tail-wrapper.sh:

#!/bin/bash
set -e

LOG_FILE="${1:?LOG_FILE required}"
HEALTH_FILE="${2:-/tmp/tail-health.txt}"

# Signal handler for clean shutdown
cleanup() {
rm -f "$HEALTH_FILE"
exit 0
}
trap cleanup EXIT INT TERM

# Monitor the file and track health
tail -F "$LOG_FILE" &
TAIL_PID=$!

# Write health marker every 10 seconds
while kill -0 $TAIL_PID 2>/dev/null; do
echo "$(date -u +%s)" > "$HEALTH_FILE"
sleep 10
done

wait $TAIL_PID

Then use it in your pipeline:

subprocess:
name: /usr/local/bin/tail-wrapper.sh
args:
- "/var/log/app.log"
- "/tmp/tail-${NODE_ID}.health"

Preventing Data Loss

1. Atomic Output Writes

Always use transactional or batch outputs:

output:
batch:
condition:
type: count
count: 100 # Don't send partial batches
output:
s3:
bucket: logs
key: batch-${timestamp_unix}.jsonl

2. Retry on Failure

Add retry logic to your outputs:

output:
retry:
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 30s
multiplier: 2
output:
http_client:
url: "https://ingest.example.com"
timeout: 10s

3. Local Backup

Always maintain a local backup alongside cloud/remote storage:

output:
broker:
pattern: round_robin
outputs:
- label: primary
http_client:
url: "https://ingest.example.com"
retry:
max_retries: 3
- label: backup
file:
path: /var/log/expanso-backup-${timestamp_unix}.jsonl

Resource Limits

Prevent tail from consuming excessive memory or CPU:

pipeline:
threads: 1 # Limit to 1 thread for subprocess (tail is single-threaded anyway)
processors:
- subprocess:
name: tail
args:
- "-F"
- "/var/log/app.log"
max_buffer: 65536 # Default is good; increase only if needed

Then enforce OS-level limits:

# Limit Expanso Edge process to 512MB RAM, 50% CPU
systemctl set-property expanso.service MemoryLimit=512M
systemctl set-property expanso.service CPUQuota=50%

Monitoring Tail Health

Check Logs for Restarts

Watch Expanso logs for subprocess restart messages:

journalctl -u expanso.service -f | grep -i "subprocess\|restart\|error"

Verify File Permissions

Regular checks prevent permission-related failures:

# Add to crontab
0 * * * * /usr/local/bin/check-log-perms.sh

/usr/local/bin/check-log-perms.sh:

#!/bin/bash
LOG_FILE="/var/log/app.log"
EXPANSO_USER="expanso"

if [ ! -r "$LOG_FILE" ]; then
echo "ERROR: $EXPANSO_USER cannot read $LOG_FILE" | logger
exit 1
fi

Monitor File Handle Limits

If tail can't open more files:

# Check current limits
ulimit -n

# Set higher limit (in systemd service file)
[Service]
LimitNOFILE=65536

Alert on Tail Failure

Create a secondary pipeline to detect when tail stops working:

input:
file:
glob: /tmp/tail-health.txt

processors:
- mapping: |
age_seconds = (now().unix() - timestamp_unix(this.body, "unix").unix())
root.is_stale = age_seconds > 30

output:
broker:
outputs:
- label: alert_if_stale
filter:
check: root.is_stale == true
http_client:
url: "https://alerts.example.com/critical"
verb: POST
body: 'Tail process appears stuck or crashed'

Troubleshooting

"Permission denied" errors

Ensure the Expanso Edge process has read permissions on the log file:

# Check permissions
ls -la /var/log/app.log

# Run Expanso with appropriate user/group
sudo -u loguser expanso start

Subprocess hangs or no output

Check that:

  1. The log file path is correct
  2. The file actually exists and is being written to
  3. Use -F instead of -f for production (handles rotation)
  4. Consider adding --retry for reliability:
subprocess:
name: tail
args:
- "-F"
- "--retry"
- "/var/log/app.log"

High CPU usage

If subprocess is consuming too much CPU:

  • Reduce pipeline threads
  • Check if the log file is being hammered with writes
  • Consider filtering earlier in the pipeline to reduce downstream processing

Tail with Other Commands

The subprocess processor works with any command, not just tail. You can combine commands:

# Use tail + grep to filter in-process
subprocess:
name: bash
args:
- "-c"
- "tail -F /var/log/app.log | grep ERROR"

Next Steps