Metrics Collection & Monitoring
This example demonstrates collecting metrics on a schedule and routing them to multiple destinations using the broker output. It shows how to generate structured metrics data, calculate derived values, and fan out to multiple outputs.
Download & Run
Quick Start:
# Download and run directly
curl -sSL https://docs.expanso.io/examples/metrics-collector.yaml | expanso-edge run -
# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/metrics-collector.yaml
expanso-edge run -f my-pipeline.yaml
Download: metrics-collector.yaml
What This Pipeline Does
- Collects metrics every 30 seconds
- Enriches with metadata (node ID, version)
- Calculates health scores based on thresholds
- Routes to multiple outputs (stdout and file)
Complete Pipeline
input:
generate:
interval: 30s
mapping: |
# Collect system metrics
root.timestamp = now()
root.host = env("HOSTNAME").or("localhost")
root.metrics.cpu_usage = random_int(min: 0, max: 100)
root.metrics.memory_usage = random_int(min: 0, max: 100)
root.metrics.disk_usage = random_int(min: 0, max: 100)
pipeline:
processors:
- mapping: |
# Add metadata
root.node_id = env("NODE_ID").or("unknown")
root.edge_version = env("EDGE_VERSION").or("dev")
# Calculate health score
let cpu = this.metrics.cpu_usage
let mem = this.metrics.memory_usage
let disk = this.metrics.disk_usage
root.health_score = if cpu > 90 || mem > 90 || disk > 90 {
"critical"
} else if cpu > 70 || mem > 70 || disk > 70 {
"warning"
} else {
"healthy"
}
output:
broker:
pattern: fan_out
outputs:
- stdout:
codec: lines
- file:
path: /var/log/expanso/metrics.json
codec: lines
Configuration Breakdown
Input: Generate Metrics
input:
generate:
interval: 30s
mapping: |
root.timestamp = now()
root.host = env("HOSTNAME").or("localhost")
root.metrics.cpu_usage = random_int(min: 0, max: 100)
root.metrics.memory_usage = random_int(min: 0, max: 100)
root.metrics.disk_usage = random_int(min: 0, max: 100)
Generates metrics every 30 seconds with:
timestamp: Current time in RFC3339 formathost: Hostname fromHOSTNAMEenvironment variablemetrics: Simulated CPU, memory, and disk usage percentages
Note: This example uses random_int() to simulate metrics. For real system metrics, you would integrate with actual monitoring tools or use the http_client input to poll metrics endpoints.
See: Generate Input
Processor: Enrich & Score
pipeline:
processors:
- mapping: |
# Add metadata
root.node_id = env("NODE_ID").or("unknown")
root.edge_version = env("EDGE_VERSION").or("dev")
# Calculate health score
let cpu = this.metrics.cpu_usage
let mem = this.metrics.memory_usage
let disk = this.metrics.disk_usage
root.health_score = if cpu > 90 || mem > 90 || disk > 90 {
"critical"
} else if cpu > 70 || mem > 70 || disk > 70 {
"warning"
} else {
"healthy"
}
This processor:
- Adds metadata: Node ID and version from environment variables
- Calculates health score based on thresholds:
critical: Any metric > 90%warning: Any metric > 70%healthy: All metrics ≤ 70%
See: Mapping Processor | Bloblang Guide
Output: Fan Out to Multiple Destinations
output:
broker:
pattern: fan_out
outputs:
- stdout:
codec: lines
- file:
path: /var/log/expanso/metrics.json
codec: lines
The broker output with fan_out pattern sends each message to all outputs:
- stdout: For real-time monitoring/debugging
- file: For persistent storage and later analysis
See: Broker Output | File Output
Example Output
Each metric message looks like:
{
"timestamp": "2024-01-15T10:30:00Z",
"host": "edge-node-01",
"metrics": {
"cpu_usage": 45,
"memory_usage": 62,
"disk_usage": 38
},
"node_id": "edge-001",
"edge_version": "1.2.0",
"health_score": "healthy"
}
Critical alert example:
{
"timestamp": "2024-01-15T10:35:00Z",
"host": "edge-node-01",
"metrics": {
"cpu_usage": 95,
"memory_usage": 88,
"disk_usage": 42
},
"node_id": "edge-001",
"edge_version": "1.2.0",
"health_score": "critical"
}
Common Variations
Fetch Real Metrics from API
Replace simulated data with actual metrics:
input:
http_client:
url: http://localhost:9090/metrics
verb: GET
interval: 30s
pipeline:
processors:
- mapping: |
# Parse Prometheus or other metrics format
root = this.parse_json()
root.node_id = env("NODE_ID").or("unknown")
root.collected_at = now()
Alert on Critical Metrics
Send critical metrics to an alerting endpoint:
output:
broker:
pattern: fan_out
outputs:
# Always write to file
- file:
path: /var/log/expanso/metrics.json
# Alert on critical health
- switch:
cases:
- check: this.health_score == "critical"
output:
http_client:
url: http://alerts.example.com/webhook
verb: POST
headers:
Content-Type: application/json
Different Metrics Per Environment
Adjust collection based on environment:
input:
generate:
interval: '@every 1m'
mapping: |
root.timestamp = now()
root.environment = env("ENVIRONMENT").or("production")
# More detailed metrics in production
root.metrics = if env("ENVIRONMENT") == "production" {
{
"cpu": random_int(min: 0, max: 100),
"memory": random_int(min: 0, max: 100),
"disk": random_int(min: 0, max: 100),
"network_in": random_int(min: 0, max: 1000),
"network_out": random_int(min: 0, max: 1000),
"active_connections": random_int(min: 0, max: 500)
}
} else {
{
"cpu": random_int(min: 0, max: 100),
"memory": random_int(min: 0, max: 100)
}
}
Aggregate Metrics Over Time
Calculate averages or aggregates:
pipeline:
processors:
- mapping: |
root = this
root.collected_at = now()
# Buffer and aggregate (requires additional configuration)
- metric:
type: gauge
name: cpu_usage
labels:
host: ${! this.host }
value: ${! this.metrics.cpu_usage }
- metric:
type: gauge
name: memory_usage
labels:
host: ${! this.host }
value: ${! this.metrics.memory_usage }
Route by Health Score
Send different health scores to different destinations:
output:
switch:
cases:
# Critical metrics go to PagerDuty
- check: this.health_score == "critical"
output:
http_client:
url: https://events.pagerduty.com/v2/enqueue
verb: POST
# Warning metrics go to Slack
- check: this.health_score == "warning"
output:
http_client:
url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
verb: POST
# Healthy metrics just get logged
- output:
file:
path: /var/log/expanso/healthy-metrics.json
Add Statistical Analysis
Calculate moving averages or percentiles:
pipeline:
processors:
- mapping: |
# Store metrics
root = this
# Calculate average across all metrics
let metrics_list = [
this.metrics.cpu_usage,
this.metrics.memory_usage,
this.metrics.disk_usage
]
root.avg_utilization = metrics_list.sum() / metrics_list.length()
root.max_utilization = metrics_list.max()
root.min_utilization = metrics_list.min()
Real-World Integration
Prometheus Metrics
For production monitoring, integrate with Prometheus:
input:
http_client:
url: http://prometheus:9090/api/v1/query
verb: GET
headers:
Accept: application/json
interval: 30s
payload: 'query=up'
pipeline:
processors:
- mapping: |
root = this.data.result.map_each(r -> {
"metric": r.metric,
"value": r.value.1.number(),
"timestamp": now()
})
Datadog Integration
Send metrics to Datadog:
output:
http_client:
url: https://api.datadoghq.com/api/v2/series
verb: POST
headers:
DD-API-KEY: ${DD_API_KEY}
Content-Type: application/json
batching:
count: 10
period: 10s
InfluxDB Time-Series Storage
Store metrics in InfluxDB:
output:
http_client:
url: http://influxdb:8086/write?db=metrics
verb: POST
headers:
Content-Type: text/plain
batching:
count: 100
period: 10s
Next Steps
- Generate Input - Scheduled data generation
- Broker Output - Multi-destination routing
- HTTP Client Input - Fetch metrics from APIs
- Bloblang Guide - Data transformation and scoring logic