Metadata Enrichment for Lineage
This example demonstrates how to stamp every message with pipeline provenance, node identity, and custom compliance tags using the metadata processor. Use it as a starting point for building OpenLineage-style lineage events or compliance audit trails.
Download & Run
Quick Start:
# Download and run directly
curl -sSL https://docs.expanso.io/examples/metadata-enrichment.yaml | expanso-edge run -
# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/metadata-enrichment.yaml
expanso-edge run -f my-pipeline.yaml
Download: metadata-enrichment.yaml
What This Pipeline Does
- Generates a synthetic event every 5 seconds (no external dependencies)
- Enriches each event with pipeline, node, and OpenLineage-aligned identity fields
- Tags events with custom compliance metadata (owner, compliance tier)
- Outputs the enriched JSON to stdout so you can inspect the result
Complete Pipeline
input:
generate:
mapping: |
root = {
"event_id": uuid_v4(),
"level": "info",
"message": "user signup"
}
interval: 5s
pipeline:
processors:
- metadata:
include: [core, pipeline, node]
custom:
pipeline_owner: [email protected]
compliance_tier: pii
target: body
format: nested
body_key: lineage
output:
stdout:
codec: lines
Configuration Breakdown
Input: Generate
input:
generate:
mapping: |
root = {
"event_id": uuid_v4(),
"level": "info",
"message": "user signup"
}
interval: 5s
The generate input emits a synthetic message every 5 seconds, so the pipeline runs anywhere without external dependencies. Replace it with kafka, file, or any other input when wiring up to real data.
See: Generate Input
Processor: Metadata Enrichment
- metadata:
include: [core, pipeline, node]
custom:
pipeline_owner: [email protected]
compliance_tier: pii
target: body
format: nested
body_key: lineage
The metadata processor splices three categories of runtime metadata under a lineage key:
core— OpenLineage-aligned identity (run_id,job_name,job_namespace,event_time,producer)pipeline— pipeline name and versionnode— node ID, region, environment, cluster, hostname, agent version
The custom: block adds two static fields that travel with every event for ownership and compliance tracking.
See: Metadata Processor Reference
Output: Stdout
output:
stdout:
codec: lines
Each enriched event is printed as a single JSON line. Swap this for kafka, aws_s3, or http_client when you're ready to send the data downstream.
Example Output
A single emitted event looks like this:
{
"event_id": "9f3e1b8a-2c5d-4e7f-9a01-3b2c4d5e6f7a",
"level": "info",
"message": "user signup",
"lineage": {
"run_id": "01HF7K8M9N0P1Q2R3S4T5V6W7X",
"job_name": "lineage-demo",
"job_namespace": "default",
"event_time": "2026-04-30T12:34:56.789Z",
"producer": "https://expanso.io/edge/v1.4.2",
"pipeline_name": "lineage-demo",
"pipeline_version": "v1",
"git_commit_sha": "",
"git_repo_url": "",
"git_branch": "",
"node_id": "node-eu-west-1-a",
"hostname": "edge-01",
"region": "eu-west-1",
"environment": "production",
"cluster_name": "edge-eu",
"agent_version": "1.4.2",
"pipeline_owner": "[email protected]",
"compliance_tier": "pii"
}
}
Common Variations
Attach to message metadata instead of the body
Use target: meta to write to Bento metadata, then reference fields downstream via interpolation (${! metadata("region") }).
- metadata:
include: [core, pipeline, node]
target: meta
Surface runtime counters
Opt in to the runtime category to attach records_in, bytes_out, error_count, and duration_ms:
- metadata:
include: [pipeline, runtime]
target: body
format: nested
body_key: stats
Flat-merge into the body
If your downstream consumer expects a flat JSON object, use format: flat. Note that metadata keys overwrite body keys on collision — use exclude to drop fields that would clash.
- metadata:
include: [pipeline]
exclude: [git_commit_sha, git_repo_url, git_branch]
target: body
format: flat
Next Steps
- Metadata Processor Reference — full configuration reference, all field categories, validation rules
- Pipeline Metadata Guide — the implicit
@pipeline_id,@node_id, and@node_label_*keys - Quick Start Guide — deploying pipelines to a real edge node