Skip to main content

Metrics Collection & Monitoring

This example demonstrates collecting metrics on a schedule and routing them to multiple destinations using the broker output. It shows how to generate structured metrics data, calculate derived values, and fan out to multiple outputs.

Download & Run

Quick Start:

# Download and run directly
curl -sSL https://docs.expanso.io/examples/metrics-collector.yaml | expanso-edge run -

# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/metrics-collector.yaml
expanso-edge run -f my-pipeline.yaml

Download: metrics-collector.yaml

What This Pipeline Does

  1. Collects metrics every 30 seconds
  2. Enriches with metadata (node ID, version)
  3. Calculates health scores based on thresholds
  4. Routes to multiple outputs (stdout and file)

Complete Pipeline

input:
generate:
interval: 30s
mapping: |
# Collect system metrics
root.timestamp = now()
root.host = env("HOSTNAME").or("localhost")
root.metrics.cpu_usage = random_int(min: 0, max: 100)
root.metrics.memory_usage = random_int(min: 0, max: 100)
root.metrics.disk_usage = random_int(min: 0, max: 100)

pipeline:
processors:
- mapping: |
# Add metadata
root.node_id = env("NODE_ID").or("unknown")
root.edge_version = env("EDGE_VERSION").or("dev")

# Calculate health score
let cpu = this.metrics.cpu_usage
let mem = this.metrics.memory_usage
let disk = this.metrics.disk_usage

root.health_score = if cpu > 90 || mem > 90 || disk > 90 {
"critical"
} else if cpu > 70 || mem > 70 || disk > 70 {
"warning"
} else {
"healthy"
}

output:
broker:
pattern: fan_out
outputs:
- stdout:
codec: lines

- file:
path: /var/log/expanso/metrics.json
codec: lines

Configuration Breakdown

Input: Generate Metrics

input:
generate:
interval: 30s
mapping: |
root.timestamp = now()
root.host = env("HOSTNAME").or("localhost")
root.metrics.cpu_usage = random_int(min: 0, max: 100)
root.metrics.memory_usage = random_int(min: 0, max: 100)
root.metrics.disk_usage = random_int(min: 0, max: 100)

Generates metrics every 30 seconds with:

  • timestamp: Current time in RFC3339 format
  • host: Hostname from HOSTNAME environment variable
  • metrics: Simulated CPU, memory, and disk usage percentages

Note: This example uses random_int() to simulate metrics. For real system metrics, you would integrate with actual monitoring tools or use the http_client input to poll metrics endpoints.

See: Generate Input

Processor: Enrich & Score

pipeline:
processors:
- mapping: |
# Add metadata
root.node_id = env("NODE_ID").or("unknown")
root.edge_version = env("EDGE_VERSION").or("dev")

# Calculate health score
let cpu = this.metrics.cpu_usage
let mem = this.metrics.memory_usage
let disk = this.metrics.disk_usage

root.health_score = if cpu > 90 || mem > 90 || disk > 90 {
"critical"
} else if cpu > 70 || mem > 70 || disk > 70 {
"warning"
} else {
"healthy"
}

This processor:

  1. Adds metadata: Node ID and version from environment variables
  2. Calculates health score based on thresholds:
    • critical: Any metric > 90%
    • warning: Any metric > 70%
    • healthy: All metrics ≤ 70%

See: Mapping Processor | Bloblang Guide

Output: Fan Out to Multiple Destinations

output:
broker:
pattern: fan_out
outputs:
- stdout:
codec: lines

- file:
path: /var/log/expanso/metrics.json
codec: lines

The broker output with fan_out pattern sends each message to all outputs:

  • stdout: For real-time monitoring/debugging
  • file: For persistent storage and later analysis

See: Broker Output | File Output

Example Output

Each metric message looks like:

{
"timestamp": "2024-01-15T10:30:00Z",
"host": "edge-node-01",
"metrics": {
"cpu_usage": 45,
"memory_usage": 62,
"disk_usage": 38
},
"node_id": "edge-001",
"edge_version": "1.2.0",
"health_score": "healthy"
}

Critical alert example:

{
"timestamp": "2024-01-15T10:35:00Z",
"host": "edge-node-01",
"metrics": {
"cpu_usage": 95,
"memory_usage": 88,
"disk_usage": 42
},
"node_id": "edge-001",
"edge_version": "1.2.0",
"health_score": "critical"
}

Common Variations

Fetch Real Metrics from API

Replace simulated data with actual metrics:

input:
http_client:
url: http://localhost:9090/metrics
verb: GET
interval: 30s

pipeline:
processors:
- mapping: |
# Parse Prometheus or other metrics format
root = this.parse_json()
root.node_id = env("NODE_ID").or("unknown")
root.collected_at = now()

Alert on Critical Metrics

Send critical metrics to an alerting endpoint:

output:
broker:
pattern: fan_out
outputs:
# Always write to file
- file:
path: /var/log/expanso/metrics.json

# Alert on critical health
- switch:
cases:
- check: this.health_score == "critical"
output:
http_client:
url: http://alerts.example.com/webhook
verb: POST
headers:
Content-Type: application/json

Different Metrics Per Environment

Adjust collection based on environment:

input:
generate:
interval: '@every 1m'
mapping: |
root.timestamp = now()
root.environment = env("ENVIRONMENT").or("production")

# More detailed metrics in production
root.metrics = if env("ENVIRONMENT") == "production" {
{
"cpu": random_int(min: 0, max: 100),
"memory": random_int(min: 0, max: 100),
"disk": random_int(min: 0, max: 100),
"network_in": random_int(min: 0, max: 1000),
"network_out": random_int(min: 0, max: 1000),
"active_connections": random_int(min: 0, max: 500)
}
} else {
{
"cpu": random_int(min: 0, max: 100),
"memory": random_int(min: 0, max: 100)
}
}

Aggregate Metrics Over Time

Calculate averages or aggregates:

pipeline:
processors:
- mapping: |
root = this
root.collected_at = now()

# Buffer and aggregate (requires additional configuration)
- metric:
type: gauge
name: cpu_usage
labels:
host: ${! this.host }
value: ${! this.metrics.cpu_usage }

- metric:
type: gauge
name: memory_usage
labels:
host: ${! this.host }
value: ${! this.metrics.memory_usage }

Route by Health Score

Send different health scores to different destinations:

output:
switch:
cases:
# Critical metrics go to PagerDuty
- check: this.health_score == "critical"
output:
http_client:
url: https://events.pagerduty.com/v2/enqueue
verb: POST

# Warning metrics go to Slack
- check: this.health_score == "warning"
output:
http_client:
url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
verb: POST

# Healthy metrics just get logged
- output:
file:
path: /var/log/expanso/healthy-metrics.json

Add Statistical Analysis

Calculate moving averages or percentiles:

pipeline:
processors:
- mapping: |
# Store metrics
root = this

# Calculate average across all metrics
let metrics_list = [
this.metrics.cpu_usage,
this.metrics.memory_usage,
this.metrics.disk_usage
]

root.avg_utilization = metrics_list.sum() / metrics_list.length()
root.max_utilization = metrics_list.max()
root.min_utilization = metrics_list.min()

Real-World Integration

Prometheus Metrics

For production monitoring, integrate with Prometheus:

input:
http_client:
url: http://prometheus:9090/api/v1/query
verb: GET
headers:
Accept: application/json
interval: 30s
payload: 'query=up'

pipeline:
processors:
- mapping: |
root = this.data.result.map_each(r -> {
"metric": r.metric,
"value": r.value.1.number(),
"timestamp": now()
})

Datadog Integration

Send metrics to Datadog:

output:
http_client:
url: https://api.datadoghq.com/api/v2/series
verb: POST
headers:
DD-API-KEY: ${DD_API_KEY}
Content-Type: application/json
batching:
count: 10
period: 10s

InfluxDB Time-Series Storage

Store metrics in InfluxDB:

output:
http_client:
url: http://influxdb:8086/write?db=metrics
verb: POST
headers:
Content-Type: text/plain
batching:
count: 100
period: 10s

Next Steps