Skip to main content

Promote Inbound Metadata into the Body

This page collects four worked pipelines that use the metadata processor's promote block to copy input-supplied message metadata into the JSON body as fields. The driving use case is SCADA / IIoT — a Sparkplug B device stamps spb_metric_name, spb_datatype, spb_timestamp, … on every metric and emits a scalar value as the body. Without promote, every pipeline that lands these to a columnar store has to repeat the same mapping block per field. With promote, it's one declarative section.

The feature is fully general: anywhere an input stamps metadata you want as columns, promote removes the boilerplate.

For the full configuration surface, see the promote field reference.

1. Sparkplug B — scalar body, ~20 spb_* keys

The canonical SCADA shape. A Sparkplug B input emits each metric as a scalar value (e.g. 23.5) and stamps the metric name, datatype, timestamp, sequence, alias, quality, source device, and a dozen other fields as spb_* metadata. Promote them all in one section, rename a few for column-friendly names, cast the numeric and boolean ones, and stamp Expanso provenance on top.

pipeline:
processors:
- metadata:
include: [core, pipeline, node]
target: body
format: flat
promote:
match: "spb_" # every spb_* key the input stamped
rename:
spb_metric_name: metric
spb_datatype: datatype
spb_timestamp: timestamp
spb_sequence: sequence
spb_is_historical: is_historical
spb_alias: alias
spb_device_id: device_id
cast:
spb_timestamp: int # epoch millis -> int64
spb_sequence: int # int alias counter
spb_is_historical: bool # "true"/"false" -> bool
wrap_key: value # 23.5 -> { "value": 23.5, ... }

output:
stdout:
codec: lines

Input message (the body is the scalar 23.5; metadata is set by the input):

body: 23.5
metadata:
spb_metric_name: "Temperature"
spb_datatype: "Float"
spb_timestamp: "1730000000000"
spb_sequence: "42"
spb_is_historical: "false"
spb_alias: "17"
spb_device_id: "plc-12"

Output:

{
"value": 23.5,
"metric": "Temperature",
"datatype": "Float",
"timestamp": 1730000000000,
"sequence": 42,
"is_historical": false,
"alias": "17",
"device_id": "plc-12",
"run_id": "01HF...",
"job_name": "scada-ingest",
"job_namespace": "default",
"event_time": "2026-04-30T12:34:56.789Z",
"producer": "https://expanso.io/edge/v1.4.2",
"pipeline_name": "scada-ingest",
"pipeline_version": "v3",
"node_id": "edge-eu-west-1-a",
"hostname": "edge-01",
"region": "eu-west-1",
"environment": "production",
"cluster_name": "edge-eu",
"agent_version": "1.4.2"
}

Walking through the configuration:

  • target: body, format: flatpromote requires target: body. Flat puts every field at the JSON root so downstream Parquet / Delta sees them as top-level columns. Switch to format: nested (with body_key) if you want a nested structure.
  • match: "spb_" — a literal prefix match. match matches by prefix unless wrapped in slashes (/regex/).
  • rename: — keyed by the inbound metadata key, even when a cast for the same key is also configured.
  • cast:int, float, or bool. A cast that fails to parse (or, for float, NaN / ±Inf) falls back to the string value and does not increment error_count — a recoverable cast fallback is not a pipeline error.
  • wrap_key: value — the body 23.5 is not an object, so promote wraps it as { "value": 23.5 } before adding fields. Without promote, a non-object body is passed through unchanged with a warning — wrapping only happens when promote is active.
  • Precedence: original body < promoted fields < resolved Expanso provenance. The resolved run_id, node_id, pipeline_name, … always win, so an inbound spb_node_id (if some input stamps one) cannot spoof Expanso's own provenance.

2. Kafka headers — promote selected headers as columns

A Kafka input stamps every header as kafka_<header-name> metadata. Promote only the ones you want as columns, leave the noisy ones alone, and cast the timestamp.

input:
kafka:
addresses: [ "broker:9092" ]
topics: [ "orders.events" ]
consumer_group: "edge-orders"

pipeline:
processors:
- metadata:
target: body
format: flat
promote:
names:
- kafka_tenant_id
- kafka_event_type
- kafka_correlation_id
- kafka_ingest_timestamp
rename:
kafka_tenant_id: tenant
kafka_event_type: event_type
kafka_correlation_id: correlation_id
kafka_ingest_timestamp: ingest_ts
cast:
kafka_ingest_timestamp: int

output:
aws_s3:
bucket: orders-events
path: ${! @tenant }/${! timestamp_unix() }.json
codec: lines

Why names and not match. Kafka stamps a lot of internal kafka_* keys (kafka_topic, kafka_partition, kafka_offset, …). A broad match: "kafka_" would pull all of them into the body. Listing the specific headers via names is more deliberate and keeps the columnar schema tight.

3. MQTT topic parts via regex

An MQTT input stamps the full topic as mqtt_topic. To break a topic like building/floor-3/zone-12/temperature into building, floor, zone, and metric columns, use a mapping step to split the topic into per-part metadata keys and then promote them with a regex match:

input:
mqtt:
urls: [ "tcp://broker:1883" ]
topics: [ "building/+/+/+" ]

pipeline:
processors:
# First, split the topic into per-part metadata keys.
- mapping: |
let parts = metadata("mqtt_topic").split("/")
meta topic_building = $parts.index(0).or("")
meta topic_floor = $parts.index(1).or("")
meta topic_zone = $parts.index(2).or("")
meta topic_metric = $parts.index(3).or("")

# Then promote everything that starts with topic_ into the body.
- metadata:
target: body
format: flat
promote:
match: "/^topic_/" # regex: leading slashes are the regex marker
rename:
topic_building: building
topic_floor: floor
topic_zone: zone
topic_metric: metric
wrap_key: payload # MQTT body is opaque bytes — wrap as "payload"

output:
parquet:
path: /var/lib/expanso/parquet/${! @building }-${! timestamp_unix() }.parquet

/^topic_/ is a regex because it is wrapped in slashes. A bare topic_ would be a literal prefix; both forms work here, but regex gives you the full power of Go's regexp package when you need it (lookahead, character classes, alternation). The empty regex // is rejected at submission — it would match every metadata key.

4. Promote file_path from the tail input

The tail input stamps every line with file_path (absolute) and file_name (basename) metadata. Promote them into the body so downstream stores see them as columns, and use the promote block alongside lineage stamping:

input:
tail:
paths: [ /var/log/myapp/*.log ]

pipeline:
processors:
- mapping: |
# Try to parse JSON logs, fall back to raw text.
let parsed = this.parse_json_object()
root = if parsed.exists() { parsed } else { { "message": this } }

- metadata:
include: [core, pipeline, node] # also stamp Expanso provenance
target: body
format: nested
body_key: provenance
promote:
names: [file_path, file_name]
rename:
file_path: source_file
file_name: source_basename

output:
http_client:
url: https://ingest.example.com/v1/logs
verb: POST

What you get. Every emitted message has source_file and source_basename at the JSON root (alongside whatever the upstream mapping step produced), plus a provenance object carrying run_id, node_id, pipeline_name, etc. The nested-format precedence rule still applies inside provenance (promoted < resolved); the root-level fields from promote sit outside the nested object.

Common variations

Promote everything except reserved fields

match: "/.+/" selects every metadata key. The runtime drops any key whose source name or rename target is reserved (node_id, pipeline_id, node_label_*, …) — see the reserved-name guard — so a permissive match cannot leak Expanso's own provenance.

promote:
match: "/.+/"

A bare empty regex (match: "//") is rejected at submission instead of silently being a "match-everything" pattern — that ambiguity was deliberately closed in the schema.

Promote with format: nested

If your downstream consumer expects a nested structure rather than flat columns, set format: nested and body_key. Promoted fields go inside the nested object alongside the resolved Expanso fields:

- metadata:
include: [core, pipeline]
target: body
format: nested
body_key: scada
promote:
match: "spb_"
rename: { spb_metric_name: metric }

A scalar body 23.5 becomes:

{
"value": 23.5,
"scada": {
"metric": "Temperature",
"datatype": "Float",
"run_id": "01HF...",
"node_id": "edge-eu-west-1-a"
}
}

wrap_key defaults to value. In nested format, wrap_key must not equal body_key — the nested metadata object would otherwise clobber the wrapped body value, and the validator rejects this at submission.

Cast a metadata value to float, with NaN-safe fallback

A device that occasionally emits "NaN" or "-" for an unset reading would otherwise drop the entire record (a non-finite float fails the body marshal). The float cast deliberately treats NaN / ±Inf / out-of-range as a parse failure and falls back to the original string:

promote:
match: "spb_"
cast:
spb_value: float

A reading of "23.5" becomes the float 23.5; a reading of "NaN" is kept as the string "NaN". Cast fallbacks are logged at debug only — they do not increment error_count, so the false-positive alert risk on a noisy source is zero.

Next Steps