Promote Inbound Metadata into the Body
This page collects four worked pipelines that use the metadata processor's promote block to copy input-supplied message metadata into the JSON body as fields. The driving use case is SCADA / IIoT — a Sparkplug B device stamps spb_metric_name, spb_datatype, spb_timestamp, … on every metric and emits a scalar value as the body. Without promote, every pipeline that lands these to a columnar store has to repeat the same mapping block per field. With promote, it's one declarative section.
The feature is fully general: anywhere an input stamps metadata you want as columns, promote removes the boilerplate.
For the full configuration surface, see the promote field reference.
1. Sparkplug B — scalar body, ~20 spb_* keys
The canonical SCADA shape. A Sparkplug B input emits each metric as a scalar value (e.g. 23.5) and stamps the metric name, datatype, timestamp, sequence, alias, quality, source device, and a dozen other fields as spb_* metadata. Promote them all in one section, rename a few for column-friendly names, cast the numeric and boolean ones, and stamp Expanso provenance on top.
pipeline:
processors:
- metadata:
include: [core, pipeline, node]
target: body
format: flat
promote:
match: "spb_" # every spb_* key the input stamped
rename:
spb_metric_name: metric
spb_datatype: datatype
spb_timestamp: timestamp
spb_sequence: sequence
spb_is_historical: is_historical
spb_alias: alias
spb_device_id: device_id
cast:
spb_timestamp: int # epoch millis -> int64
spb_sequence: int # int alias counter
spb_is_historical: bool # "true"/"false" -> bool
wrap_key: value # 23.5 -> { "value": 23.5, ... }
output:
stdout:
codec: lines
Input message (the body is the scalar 23.5; metadata is set by the input):
body: 23.5
metadata:
spb_metric_name: "Temperature"
spb_datatype: "Float"
spb_timestamp: "1730000000000"
spb_sequence: "42"
spb_is_historical: "false"
spb_alias: "17"
spb_device_id: "plc-12"
Output:
{
"value": 23.5,
"metric": "Temperature",
"datatype": "Float",
"timestamp": 1730000000000,
"sequence": 42,
"is_historical": false,
"alias": "17",
"device_id": "plc-12",
"run_id": "01HF...",
"job_name": "scada-ingest",
"job_namespace": "default",
"event_time": "2026-04-30T12:34:56.789Z",
"producer": "https://expanso.io/edge/v1.4.2",
"pipeline_name": "scada-ingest",
"pipeline_version": "v3",
"node_id": "edge-eu-west-1-a",
"hostname": "edge-01",
"region": "eu-west-1",
"environment": "production",
"cluster_name": "edge-eu",
"agent_version": "1.4.2"
}
Walking through the configuration:
target: body, format: flat—promoterequirestarget: body. Flat puts every field at the JSON root so downstream Parquet / Delta sees them as top-level columns. Switch toformat: nested(withbody_key) if you want a nested structure.match: "spb_"— a literal prefix match.matchmatches by prefix unless wrapped in slashes (/regex/).rename:— keyed by the inbound metadata key, even when acastfor the same key is also configured.cast:—int,float, orbool. A cast that fails to parse (or, forfloat,NaN/±Inf) falls back to the string value and does not incrementerror_count— a recoverable cast fallback is not a pipeline error.wrap_key: value— the body23.5is not an object, sopromotewraps it as{ "value": 23.5 }before adding fields. Withoutpromote, a non-object body is passed through unchanged with a warning — wrapping only happens whenpromoteis active.- Precedence: original body < promoted fields < resolved Expanso provenance. The resolved
run_id,node_id,pipeline_name, … always win, so an inboundspb_node_id(if some input stamps one) cannot spoof Expanso's own provenance.
2. Kafka headers — promote selected headers as columns
A Kafka input stamps every header as kafka_<header-name> metadata. Promote only the ones you want as columns, leave the noisy ones alone, and cast the timestamp.
input:
kafka:
addresses: [ "broker:9092" ]
topics: [ "orders.events" ]
consumer_group: "edge-orders"
pipeline:
processors:
- metadata:
target: body
format: flat
promote:
names:
- kafka_tenant_id
- kafka_event_type
- kafka_correlation_id
- kafka_ingest_timestamp
rename:
kafka_tenant_id: tenant
kafka_event_type: event_type
kafka_correlation_id: correlation_id
kafka_ingest_timestamp: ingest_ts
cast:
kafka_ingest_timestamp: int
output:
aws_s3:
bucket: orders-events
path: ${! @tenant }/${! timestamp_unix() }.json
codec: lines
Why names and not match. Kafka stamps a lot of internal kafka_* keys (kafka_topic, kafka_partition, kafka_offset, …). A broad match: "kafka_" would pull all of them into the body. Listing the specific headers via names is more deliberate and keeps the columnar schema tight.
3. MQTT topic parts via regex
An MQTT input stamps the full topic as mqtt_topic. To break a topic like building/floor-3/zone-12/temperature into building, floor, zone, and metric columns, use a mapping step to split the topic into per-part metadata keys and then promote them with a regex match:
input:
mqtt:
urls: [ "tcp://broker:1883" ]
topics: [ "building/+/+/+" ]
pipeline:
processors:
# First, split the topic into per-part metadata keys.
- mapping: |
let parts = metadata("mqtt_topic").split("/")
meta topic_building = $parts.index(0).or("")
meta topic_floor = $parts.index(1).or("")
meta topic_zone = $parts.index(2).or("")
meta topic_metric = $parts.index(3).or("")
# Then promote everything that starts with topic_ into the body.
- metadata:
target: body
format: flat
promote:
match: "/^topic_/" # regex: leading slashes are the regex marker
rename:
topic_building: building
topic_floor: floor
topic_zone: zone
topic_metric: metric
wrap_key: payload # MQTT body is opaque bytes — wrap as "payload"
output:
parquet:
path: /var/lib/expanso/parquet/${! @building }-${! timestamp_unix() }.parquet
/^topic_/ is a regex because it is wrapped in slashes. A bare topic_ would be a literal prefix; both forms work here, but regex gives you the full power of Go's regexp package when you need it (lookahead, character classes, alternation). The empty regex // is rejected at submission — it would match every metadata key.
4. Promote file_path from the tail input
The tail input stamps every line with file_path (absolute) and file_name (basename) metadata. Promote them into the body so downstream stores see them as columns, and use the promote block alongside lineage stamping:
input:
tail:
paths: [ /var/log/myapp/*.log ]
pipeline:
processors:
- mapping: |
# Try to parse JSON logs, fall back to raw text.
let parsed = this.parse_json_object()
root = if parsed.exists() { parsed } else { { "message": this } }
- metadata:
include: [core, pipeline, node] # also stamp Expanso provenance
target: body
format: nested
body_key: provenance
promote:
names: [file_path, file_name]
rename:
file_path: source_file
file_name: source_basename
output:
http_client:
url: https://ingest.example.com/v1/logs
verb: POST
What you get. Every emitted message has source_file and source_basename at the JSON root (alongside whatever the upstream mapping step produced), plus a provenance object carrying run_id, node_id, pipeline_name, etc. The nested-format precedence rule still applies inside provenance (promoted < resolved); the root-level fields from promote sit outside the nested object.
Common variations
Promote everything except reserved fields
match: "/.+/" selects every metadata key. The runtime drops any key whose source name or rename target is reserved (node_id, pipeline_id, node_label_*, …) — see the reserved-name guard — so a permissive match cannot leak Expanso's own provenance.
promote:
match: "/.+/"
A bare empty regex (match: "//") is rejected at submission instead of silently being a "match-everything" pattern — that ambiguity was deliberately closed in the schema.
Promote with format: nested
If your downstream consumer expects a nested structure rather than flat columns, set format: nested and body_key. Promoted fields go inside the nested object alongside the resolved Expanso fields:
- metadata:
include: [core, pipeline]
target: body
format: nested
body_key: scada
promote:
match: "spb_"
rename: { spb_metric_name: metric }
A scalar body 23.5 becomes:
{
"value": 23.5,
"scada": {
"metric": "Temperature",
"datatype": "Float",
"run_id": "01HF...",
"node_id": "edge-eu-west-1-a"
}
}
wrap_key defaults to value. In nested format, wrap_key must not equal body_key — the nested metadata object would otherwise clobber the wrapped body value, and the validator rejects this at submission.
Cast a metadata value to float, with NaN-safe fallback
A device that occasionally emits "NaN" or "-" for an unset reading would otherwise drop the entire record (a non-finite float fails the body marshal). The float cast deliberately treats NaN / ±Inf / out-of-range as a parse failure and falls back to the original string:
promote:
match: "spb_"
cast:
spb_value: float
A reading of "23.5" becomes the float 23.5; a reading of "NaN" is kept as the string "NaN". Cast fallbacks are logged at debug only — they do not increment error_count, so the false-positive alert risk on a noisy source is zero.
Next Steps
metadataprocessor reference — full configuration reference, including thepromotefield reference, validation rules, and the reserved-name guard.- Metadata Enrichment for Lineage example — the lineage-focused companion example for the same processor.
tailinput — the log-tailing input that stampsfile_path/file_namemetadata used in example 4.- Pipeline metadata guide — the implicit
@pipeline_id,@node_id, and@node_label_*keys you do not needpromotefor. - Quick Start — how to actually deploy and run these pipelines on an edge node.