Skip to main content

Processors

A processor is a unit of logic that transforms, filters, or otherwise processes messages as they flow through your pipeline:

pipeline:
processors:
- mapping: |
root.message = this.content.uppercase()
root.meta.processed_at = now()

- filter:
check: 'this.type == "important"'

Processors can be configured at the input, pipeline, or output level. They are applied in the order they are defined.

Processor Pipelines

Processors can be nested to create complex transformation logic:

pipeline:
processors:
- branch:
processors:
- cache:
resource: user_cache
operator: get
key: ${! json("user_id") }
result_map: 'root.user = this'

- mapping: |
root.enriched = this.merge(this.user)

Error Handling

Processors can fail during execution. Use the catch processor to handle errors gracefully:

pipeline:
processors:
- try:
- mapping: 'root = this.parse_json()'
- catch:
- log:
message: "Failed to parse: ${! content() }"

Browse Processors

Showing 72 of 200 components
archive
Processor
Archives all the messages of a batch into a single message according to the selected archive format.
ParsingUtility
avro
Processor
Performs Avro based operations on messages based on a schema.
ParsingTransform
awk
Processor
Executes an AWK program on messages. This processor is very powerful as it offers a range of custom functions for querying and mutating message conten...
Mapping
aws_dynamodb_partiql
Processor
:::caution EXPERIMENTAL
IntegrationAWSCloud+2
aws_lambda
Processor
Invokes an AWS lambda for each message. The contents of the message is the payload of the request, and the result of the invocation will become the ne...
IntegrationAWSCloud
aws_s3
Processor
Performs an S3 GetObject operation using the `bucket` + `key` provided in the config and replaces the original message parts with the content retrieve...
ServicesAWSCloud+1
azure_cosmosdb
Processor
Creates or updates messages as JSON documents in Azure CosmosDB.
AzureCloud
bounds_check
Processor
Removes messages (and batches) that do not fit within certain size boundaries.
Utility
branch
Processor
The `branch` processor allows you to create a new request message via a Bloblang mapping, execute a list of processors on the request messages, and, f...
CompositionTransformRouting
cache
Processor
Performs operations against a cache resource for each message, allowing you to store or retrieve data within message payloads.
Integration
cached
Processor
Cache the result of applying one or more processors to messages identified by a key. If the key already exists within the cache the contents of the me...
Utility
catch
Processor
Applies a list of child processors _only_ when a previous processing step has failed.
Composition
command
Processor
Executes a command for each message.
Integration
compress
Processor
Compresses messages according to the selected algorithm. Supported compression algorithms are: [flate gzip lz4 pgzip snappy zlib]
Parsing
couchbase
Processor
:::caution EXPERIMENTAL
Integration
decompress
Processor
Decompresses messages according to the selected algorithm. Supported decompression algorithms are: [bzip2 flate gzip lz4 pgzip snappy zlib]
Parsing
dedupe
Processor
Deduplicates messages by storing a key value in a cache using the `add` operator. If the key already exists within the cache it is dropped.
Utility
for_each
Processor
A processor that applies a list of child processors to messages of a batch as though they were each a batch of one message.
Composition
gcp_bigquery_select
Processor
:::caution EXPERIMENTAL
IntegrationGCPCloud
grok
Processor
Parses messages into a structured format by attempting to apply a list of Grok expressions, the first expression to result in at least one value repla...
Parsing
group_by
Processor
Splits a batch of messages into N batches, where each resulting batch contains a group of messages determined by a Bloblang query.
Composition
group_by_value
Processor
Splits a batch of messages into N batches, where each resulting batch contains a group of messages determined by a function interpolated string evalua...
Composition
http
Processor
Performs an HTTP request using a message batch as the request body, and replaces the original message parts with the body of the response.
IntegrationNetworkHTTP
insert_part
Processor
Insert a new message into a batch at an index. If the specified index is greater than the length of the existing batch it will be appended to the end.
Composition
javascript
Processor
:::caution EXPERIMENTAL
Mapping
jmespath
Processor
Executes a JMESPath query on JSON documents and replaces the message with the resulting document.
Mapping
jq
Processor
Transforms and filters messages using jq queries.
Mapping
json_schema
Processor
Checks messages against a provided JSONSchema definition but does not change the payload under any circumstances. If a message does not match the sche...
MappingParsingTransform
log
Processor
Prints a log event for each message. Messages always remain unchanged. The log message can be set using function interpolations described here which a...
Utility
mapping
Processor
Executes a Bloblang mapping on messages, creating a new document that replaces (or filters) the original message.
MappingParsingTransform+1
metric
Processor
Emit custom metrics by extracting values from messages.
Utility
mongodb
Processor
Performs operations against MongoDB for each message, allowing you to store or retrieve data within message payloads.
ServicesDatabaseNoSQL
msgpack
Processor
:::caution BETA
Parsing
mutation
Processor
Executes a Bloblang mapping and directly transforms the contents of messages, mutating (or deleting) them.
MappingParsing
nats_kv
Processor
Perform operations on a NATS key-value bucket.
ServicesMessagingStreaming
nats_object_store
Processor
:::caution EXPERIMENTAL
ServicesMessagingStreaming
nats_request_reply
Processor
Sends a message to a NATS subject and expects a reply, from a NATS subscriber acting as a responder, back.
ServicesMessagingStreaming
nlp_classify_text
Processor
:::caution BETA
Machine LearningNLP
nlp_classify_tokens
Processor
:::caution BETA
Machine LearningNLP
nlp_extract_features
Processor
:::caution BETA
Machine LearningNLP
nlp_zero_shot_classify
Processor
:::caution BETA
Machine LearningNLP
noop
Processor
Noop is a processor that does nothing, the message passes through unchanged. Why? Sometimes doing nothing is the braver option.
opensnowcat
Processor
:::caution EXPERIMENTAL
Parsing
parallel
Processor
A processor that applies a list of child processors to messages of a batch as though they were each a batch of one message (similar to the `for_each` ...
Composition
parquet_decode
Processor
:::caution EXPERIMENTAL
Parsing
parquet_encode
Processor
:::caution EXPERIMENTAL
Parsing
parse_log
Processor
Parses common log formats into structured data. This is easier and often much faster than `grok`.
Parsing
processors
Processor
A processor grouping several sub-processors.
Composition
protobuf
Processor
Performs conversions to or from a protobuf message. This processor uses reflection, meaning conversions can be made directly from the target .proto fi...
ParsingTransform
rate_limit
Processor
Throttles the throughput of a pipeline according to a specified `rate_limit` resource. Rate limits are shared across components and therefore apply gl...
Utility
redis
Processor
Performs actions against Redis that aren't possible using a `cache` processor. Actions are
IntegrationMessagingCaching
redis_script
Processor
Performs actions against Redis using LUA scripts.
IntegrationMessagingCaching
resource
Processor
Resource is a processor type that runs a processor resource identified by its label.
Utility
retry
Processor
Attempts to execute a series of child processors until success.
Composition
schema_registry_decode
Processor
Automatically decodes and validates messages with schemas from a Confluent Schema Registry service.
ParsingIntegration
schema_registry_encode
Processor
Automatically encodes and validates messages with schemas from a Confluent Schema Registry service.
ParsingIntegration
select_parts
Processor
Cherry pick a set of messages from a batch by their index. Indexes larger than the number of messages are simply ignored.
Utility
sentry_capture
Processor
:::caution EXPERIMENTAL
sleep
Processor
Sleep for a period of time specified as a duration string for each message. This processor will interpolate functions within the `duration` field, you...
Utility
split
Processor
Breaks message batches (synonymous with multiple part messages) into smaller batches. The size of the resulting batches are determined either by a dis...
Utility
sql_insert
Processor
Inserts rows into an SQL database for each message, and leaves the message unchanged.
Integration
sql_raw
Processor
Runs an arbitrary SQL query against a database and (optionally) returns the result as an array of objects, one for each row returned.
Integration
sql_select
Processor
Runs an SQL select query against a database and returns the result as an array of objects, one for each row returned, containing a key for each column...
Integration
subprocess
Processor
Executes a command as a subprocess and, for each message, will pipe its contents to the stdin stream of the process followed by a newline.
Integration
switch
Processor
Conditionally processes messages based on their contents.
CompositionTransformRouting
sync_response
Processor
Adds the payload in its current state as a synchronous response to the input source, where it is dealt with according to that specific input type.
Utility
try
Processor
Executes a list of child processors on messages only if no prior processors have failed (or the errors have been cleared).
Composition
unarchive
Processor
Unarchives messages according to the selected archive format into multiple messages within a batch.
ParsingUtility
wasm
Processor
:::caution EXPERIMENTAL
Utility
while
Processor
A processor that checks a Bloblang query against each batch of messages and executes child processors on them for as long as the query resolves to tru...
Composition
workflow
Processor
Executes a topology of [`branch` processors][processors.branch], performing them in parallel where possible.
Composition
xml
Processor
:::caution BETA
ParsingTransform