Skip to main content

Emit OpenLineage Events

Expanso Edge emits OpenLineage RunEvents natively at every pipeline lifecycle transition. Downstream catalogs and lineage tools (Marquez, OpenLineage-aware orchestrators, custom dashboards) see when each pipeline ran, against which datasets, and how it ended — emitted directly from the edge node where the data was processed.

Every event is signed with the edge node's Ed25519 identity so downstream consumers can verify both origin and integrity. See Verify Lineage Events for the verification recipe.

When events fire

The edge emits one RunEvent per lifecycle transition. The four event types map to OpenLineage's eventType field:

EventWhen
STARTPipeline reaches the running state.
COMPLETEThe pipeline stream exits cleanly.
FAILThe pipeline stream exits with an error. The event includes an errorMessage facet with the error and stack trace.
ABORTThe pipeline is stopped externally (via expanso job stop) after reaching the running state. Distinct from COMPLETE so consumers can tell natural completions from operator intervention.

START events carry runtime counters that are zero. COMPLETE, FAIL, and ABORT events carry final counter values (records in/out, bytes in/out, error count) in the custom expanso_run facet.

Minimal setup

Two fields enable emission against a local OpenLineage collector:

lineage:
enabled: true
transport: http
http:
endpoint: http://localhost:5000/api/v1/lineage

Restart the edge to pick up the change. Every pipeline lifecycle transition now POSTs a JSON event to the configured endpoint.

For the full configuration surface, see Lineage Configuration.

HTTP transport

Send events to any OpenLineage-compatible HTTP endpoint — Marquez is the reference implementation:

lineage:
enabled: true
transport: http
queue_size: 1024
drain_timeout: 5s
http:
endpoint: https://lineage.example.com/api/v1/lineage
timeout: 3s
auth:
type: bearer
token_env: OPENLINEAGE_TOKEN

A few behaviors to be aware of:

  • The HTTP transport does not retry on failure. Lineage delivery is at-most-once by design — a failed event drops, increments lineage_events_dropped_total, and surfaces in the execution's HealthTracker. The pipeline itself is never blocked or failed because lineage delivery failed.
  • The bearer token is read once at edge startup. Rotating the token requires restarting the edge.
  • When auth.type is set, the endpoint must use https://. The edge refuses to start with an HTTP endpoint plus auth, to avoid sending the token in cleartext.

Authentication and external secret managers

The edge does not integrate directly with Vault, AWS Secrets Manager, or GCP Secret Manager for lineage tokens. The supported pattern is to resolve the secret out-of-band and export it as an environment variable before the edge starts. With systemd:

[Service]
EnvironmentFile=/run/expanso/lineage-token.env
ExecStartPre=/usr/local/bin/fetch-lineage-token /run/expanso/lineage-token.env
ExecStart=/usr/local/bin/expanso-edge run

Where fetch-lineage-token is a short script that resolves the secret from your provider and writes OPENLINEAGE_TOKEN=<value> to the env file.

File transport

For air-gapped deployments, archival, or downstream shipping by a separate process, write events to a JSON-Lines file:

lineage:
enabled: true
transport: file
file:
path: /var/lib/expanso/lineage/events.jsonl
rotation_size_mb: 64

Each line is a complete, schema-conformant OpenLineage event. When the file exceeds rotation_size_mb, it is renamed events.jsonl.<unix-nanos> and a fresh events.jsonl is opened. Set rotation_size_mb: 0 to disable rotation.

The file transport assumes a single writer per path. If you run multiple edge processes on the same host, give each its own path.

Run against Marquez

Marquez is the OpenLineage reference backend and the easiest way to see your edge's lineage events in a UI. The walkthrough below assumes Docker is available locally.

1. Start Marquez

Save as docker-compose.marquez.yaml:

services:
postgres:
image: postgres:14
environment:
POSTGRES_USER: marquez
POSTGRES_PASSWORD: marquez
POSTGRES_DB: marquez
marquez:
image: marquezproject/marquez:0.45.0
ports: ["5000:5000", "5001:5001"]
depends_on: [postgres]
environment:
MARQUEZ_DB_HOST: postgres
MARQUEZ_DB_USER: marquez
MARQUEZ_DB_PASSWORD: marquez
marquez-web:
image: marquezproject/marquez-web:0.45.0
ports: ["3000:3000"]
depends_on: [marquez]
environment:
MARQUEZ_HOST: marquez
MARQUEZ_PORT: "5000"

Then:

docker compose -f docker-compose.marquez.yaml up -d
# UI: http://localhost:3000
# API: http://localhost:5000

2. Point the edge at Marquez

lineage:
enabled: true
transport: http
http:
endpoint: http://localhost:5000/api/v1/lineage
timeout: 3s

Restart the edge.

3. Run a pipeline

Submit any pipeline — a short-lived generate → file is enough:

expanso job submit --file - <<'EOF'
type: pipeline
name: lineage-demo
config:
input:
generate:
mapping: 'root = "hello-" + uuid_v4()'
count: 100
interval: 50ms
output:
file:
path: /tmp/lineage-demo-out.txt
EOF

4. Inspect events in Marquez

Open http://localhost:3000, navigate to the default namespace, find the lineage-demo job, and open its latest run:

  • Two events appear in order: START, then COMPLETE, sharing one runId.
  • The input edge resolves to an expanso://generate Dataset; the output to file://localhost.
  • The custom expanso_run facet shows non-zero records_in, records_out, bytes_in, bytes_out.

To tear down:

docker compose -f docker-compose.marquez.yaml down -v

Failure handling

The runtime contract is strict: lineage emission never blocks or fails the pipeline. Failures route to the per-execution HealthTracker and the single dropped-events counter.

FailureCounterHealthTrackerPipeline impact
Queue full (overflow)lineage_events_dropped_total +1Records "lineage" component errorNone
HTTP 4xx / 5xx / networklineage_events_dropped_total +1Records "lineage" component errorNone
File write errorlineage_events_dropped_total +1Records "lineage" component errorNone

lineage_events_dropped_total is exported through the standard telemetry pipeline. It is a single series with no attributes, so it adds no cardinality.

Lineage component errors are visible per-execution:

expanso execution describe <execution-id>

When the lineage backend is unreachable, pipelines continue to complete normally; only the events drop.

Dataset extraction

The edge inspects each pipeline's input and output components and maps them to OpenLineage Datasets. Common mappings:

ComponentNamespaceName
file, csvfile://localhostabsolute path
aws_s3, s3s3://<bucket>object path
kafka, kafka_franz, redpandakafka://<seed_brokers[0]>first topic
nats, nats_jetstreamnats://<urls[0]>subject or stream
gcp_pubsubpubsub://<project>subscription (input) or topic (output)
http_serverhttp://<host>listen path
http_client<scheme>://<host>URL path
sql_select, sql_insert<driver>://<dsn-host>table
stdin, stdoutstdio://localhoststdin or stdout
generateexpanso://generatelabel, or synthetic
anything elseunknown://<component-type>label, if present

broker fan-in / fan-out and switch shapes are walked recursively: a broker with five outputs produces five output Datasets. Unknown components map to unknown:// namespaces and still produce schema-valid events.

Event signing

Every emitted event carries a signature facet under run.facets.signature containing an Ed25519 signature over the canonical event bytes. Consumers that need tamper-evident audit trails can verify the signature with the producing edge node's public key.

The full verification protocol — including the canonicalization rule that verifiers must match byte-for-byte to avoid drift — is documented in Verify Lineage Events, with worked Go and Python examples.

Pairing with the metadata processor

The lineage emitter and the metadata processor are complementary surfaces:

  • The lineage emitter sends one event per pipeline lifecycle transition to a lineage backend (Marquez, etc.). The audience is data catalogs and governance tooling.
  • The metadata processor stamps every message with OpenLineage-aligned identity fields (run_id, job_name, job_namespace, event_time, producer). The audience is downstream pipelines and storage that need lineage context inline with the data.

The run_id produced by the metadata processor matches the runId field on lineage events for the same execution, so consumers can correlate event-stream records back to the lifecycle events the edge emitted.