Skip to main content

Metadata Enrichment for Lineage

This example demonstrates how to stamp every message with pipeline provenance, node identity, and custom compliance tags using the metadata processor. Use it as a starting point for building OpenLineage-style lineage events or compliance audit trails.

Download & Run

Quick Start:

# Download and run directly
curl -sSL https://docs.expanso.io/examples/metadata-enrichment.yaml | expanso-edge run -

# Or download first, customize, then run
curl -o my-pipeline.yaml https://docs.expanso.io/examples/metadata-enrichment.yaml
expanso-edge run -f my-pipeline.yaml

Download: metadata-enrichment.yaml

What This Pipeline Does

  1. Generates a synthetic event every 5 seconds (no external dependencies)
  2. Enriches each event with pipeline, node, and OpenLineage-aligned identity fields
  3. Tags events with custom compliance metadata (owner, compliance tier)
  4. Outputs the enriched JSON to stdout so you can inspect the result

Complete Pipeline

input:
generate:
mapping: |
root = {
"event_id": uuid_v4(),
"level": "info",
"message": "user signup"
}
interval: 5s

pipeline:
processors:
- metadata:
include: [core, pipeline, node]
custom:
pipeline_owner: [email protected]
compliance_tier: pii
target: body
format: nested
body_key: lineage

output:
stdout:
codec: lines

Configuration Breakdown

Input: Generate

input:
generate:
mapping: |
root = {
"event_id": uuid_v4(),
"level": "info",
"message": "user signup"
}
interval: 5s

The generate input emits a synthetic message every 5 seconds, so the pipeline runs anywhere without external dependencies. Replace it with kafka, file, or any other input when wiring up to real data.

See: Generate Input

Processor: Metadata Enrichment

- metadata:
include: [core, pipeline, node]
custom:
pipeline_owner: [email protected]
compliance_tier: pii
target: body
format: nested
body_key: lineage

The metadata processor splices three categories of runtime metadata under a lineage key:

  • core — OpenLineage-aligned identity (run_id, job_name, job_namespace, event_time, producer)
  • pipeline — pipeline name and version
  • node — node ID, region, environment, cluster, hostname, agent version

The custom: block adds two static fields that travel with every event for ownership and compliance tracking.

See: Metadata Processor Reference

Output: Stdout

output:
stdout:
codec: lines

Each enriched event is printed as a single JSON line. Swap this for kafka, aws_s3, or http_client when you're ready to send the data downstream.

Example Output

A single emitted event looks like this:

{
"event_id": "9f3e1b8a-2c5d-4e7f-9a01-3b2c4d5e6f7a",
"level": "info",
"message": "user signup",
"lineage": {
"run_id": "01HF7K8M9N0P1Q2R3S4T5V6W7X",
"job_name": "lineage-demo",
"job_namespace": "default",
"event_time": "2026-04-30T12:34:56.789Z",
"producer": "https://expanso.io/edge/v1.4.2",
"pipeline_name": "lineage-demo",
"pipeline_version": "v1",
"git_commit_sha": "",
"git_repo_url": "",
"git_branch": "",
"node_id": "node-eu-west-1-a",
"hostname": "edge-01",
"region": "eu-west-1",
"environment": "production",
"cluster_name": "edge-eu",
"agent_version": "1.4.2",
"pipeline_owner": "[email protected]",
"compliance_tier": "pii"
}
}

Common Variations

Attach to message metadata instead of the body

Use target: meta to write to Bento metadata, then reference fields downstream via interpolation (${! metadata("region") }).

- metadata:
include: [core, pipeline, node]
target: meta

Surface runtime counters

Opt in to the runtime category to attach records_in, bytes_out, error_count, and duration_ms:

- metadata:
include: [pipeline, runtime]
target: body
format: nested
body_key: stats

Flat-merge into the body

If your downstream consumer expects a flat JSON object, use format: flat. Note that metadata keys overwrite body keys on collision — use exclude to drop fields that would clash.

- metadata:
include: [pipeline]
exclude: [git_commit_sha, git_repo_url, git_branch]
target: body
format: flat

Next Steps