Skip to main content

Metadata

Each message has raw contents and metadata, which is a map of key/value pairs representing an arbitrary amount of complementary data.

When an input protocol supports attributes or metadata they will automatically be added to your messages, refer to the respective input documentation for a list of metadata keys. When an output supports attributes or metadata any metadata key/value pairs in a message will be sent (subject to service limits).

Execution Context Metadata

In addition to metadata supplied by inputs, Expanso Edge automatically injects context about the running pipeline and the node executing it. These values are available on every message and can be referenced with the @ operator in Bloblang or ${! metadata("...") } in interpolated fields.

Pipeline and node identifiers

Metadata keyDescription
@pipeline_idThe pipeline (job) identifier
@pipeline_versionThe version of the pipeline being executed
@execution_idThe unique identifier for this pipeline execution
@node_idThe edge node running the pipeline
@namespaceThe namespace this execution belongs to

@job_id and @job_version are accepted as aliases for @pipeline_id and @pipeline_version.

Node labels

Any labels configured on the edge node are exposed as metadata under the node_label_ prefix. For example, a node configured with:

labels:
env: local
region: us-west-2

makes @node_label_env and @node_label_region available in every pipeline running on that node:

root.environment = @node_label_env       # "local"
root.region = @node_label_region # "us-west-2"

This is useful for tagging output with the node's identity, branching pipeline logic by environment, or routing data to different destinations depending on where the pipeline runs:

pipeline:
processors:
- mapping: |
root = this
meta target_bucket = if @node_label_env == "production" {
"prod-events"
} else {
"dev-events"
}

output:
aws_s3:
bucket: ${! metadata("target_bucket") }
path: ${! @node_id }/${! timestamp_unix() }.json

Label key sanitization

Bloblang metadata keys must be valid identifiers, so label keys are sanitized before injection:

  • Characters in [a-zA-Z0-9_-] are preserved.
  • Any other character (., space, :, /, etc.) is replaced with _.
  • Keys are case-sensitive.
Label keyBloblang reference
env@node_label_env
team.name@node_label_team_name
gpu-class@node_label_gpu-class

When labels change

Labels are read from the node configuration once when the pipeline starts. Updating the node's labels: block only affects pipelines launched after the change; in-flight executions keep the labels they started with.

The metadata Processor

The implicit context keys above are always available, but if you want to bulk-attach runtime metadata to messages — including OpenLineage-aligned identity, runtime counters, or custom static fields — use the metadata processor for a declarative form:

pipeline:
processors:
- metadata:
include: [core, pipeline, node]
custom:
pipeline_owner: [email protected]
target: body
format: nested
body_key: lineage

The implicit keys above (@pipeline_id, @node_id, etc.) and the processor's category fields (pipeline_name, node_id, etc.) are independent surfaces — pick whichever shape is easier to consume downstream. See the metadata processor reference for the full list of keys and the format/target matrix.

Editing Metadata

Expanso Edge allows you to add and remove metadata using the mapping processor. For example, you can do something like this in your pipeline:

pipeline:
processors:
- mapping: |
# Remove all existing metadata from messages
meta = deleted()

# Add a new metadata field `time` from the contents of a JSON
# field `event.timestamp`
meta time = event.timestamp

You can also use Bloblang to delete individual metadata keys with:

meta foo = deleted()

Or do more interesting things like remove all metadata keys with a certain prefix:

meta = @.filter(kv -> !kv.key.has_prefix("kafka_"))

Using Metadata

caution

There are two functions to reference metadata: meta() and metadata(). meta() has been depreciated in favor of metadata().

Metadata values can be referenced in any field that supports interpolation functions. For example, you can route messages to Kafka topics using interpolation of metadata keys:

output:
kafka:
addresses: [ localhost:9092 ]
topic: ${! metadata("target_topic") }

You can also to conditionally process messages based on their metadata with the switch processor:

pipeline:
processors:
- switch:
- check: '@doc_type == "nested"'
processors:
- sql_insert:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
table: footable
columns: [ foo, bar, baz ]
args_mapping: |
root = [
this.document.foo,
this.document.bar,
@kafka_topic,
]

Restricting Metadata

Outputs that support metadata, headers or some other variant of enriched fields on messages will attempt to send all metadata key/value pairs by default. However, sometimes it's useful to refer to metadata fields at the output level even though we do not wish to send them with our data. In this case it's possible to restrict the metadata keys that are sent with the field metadata.exclude_prefixes within the respective output config.

For example, if we were sending messages to kafka using a metadata key target_topic to determine the topic but we wished to prevent that metadata key from being sent as a header we could use the following configuration:

output:
kafka:
addresses: [ localhost:9092 ]
topic: ${! metadata("target_topic") }
metadata:
exclude_prefixes:
- target_topic

And when the list of metadata keys that we do not want to send is large it can be helpful to use a Bloblang mapping in order to give all of these "private" keys a common prefix:

pipeline:
processors:
# Has an explicit list of public metadata keys, and everything else is given
# an underscore prefix.
- mapping: |
let allowed_meta = [
"foo",
"bar",
"baz",
]
meta = @.map_each_key(key -> if !$allowed_meta.contains(key) {
"_" + key
})

output:
kafka:
addresses: [ localhost:9092 ]
topic: ${! metadata("_target_topic") }
metadata:
exclude_prefixes: [ "_" ]