cache
Performs operations against a cache resource for each message, allowing you to store or retrieve data within message payloads.
- Common
- Advanced
# Common config fields, showing default values
label: ""
cache:
resource: "" # No default (required)
operator: "" # No default (required)
key: "" # No default (required)
value: "" # No default (optional)
# All config fields, showing default values
label: ""
cache:
resource: "" # No default (required)
operator: "" # No default (required)
key: "" # No default (required)
value: "" # No default (optional)
ttl: 60s # No default (optional)
For use cases where you wish to cache the result of processors consider using the cached processor instead.
This processor will interpolate functions within the key and value fields individually for each message. This allows you to specify dynamic keys and values based on the contents of the message payloads and metadata. You can find a list of functions here.
Examples
- Deduplication
- Deduplication Batch-Wide
- Hydration
Deduplication can be done using the add operator with a key extracted from the message payload, since it fails when a key already exists we can remove the duplicates using a mapping processor:
pipeline:
processors:
- cache:
resource: foocache
operator: add
key: '${! json("message.id") }'
value: "storeme"
- mapping: root = if errored() { deleted() }
cache_resources:
- label: foocache
redis:
url: tcp://TODO:6379
Sometimes it's necessary to deduplicate a batch of messages (AKA a window) by a single identifying value. This can be done by introducing a branch processor, which executes the cache only once on behalf of the batch, in this case with a value make from a field extracted from the first and last messages of the batch:
pipeline:
processors:
# Try and add one message to a cache that identifies the whole batch
- branch:
request_map: |
root = if batch_index() == 0 {
json("id").from(0) + json("meta.tail_id").from(-1)
} else { deleted() }
processors:
- cache:
resource: foocache
operator: add
key: ${! content() }
value: t
# Delete all messages if we failed
- mapping: |
root = if errored().from(0) {
deleted()
}
It's possible to enrich payloads with content previously stored in a cache by using the branch processor:
pipeline:
processors:
- branch:
processors:
- cache:
resource: foocache
operator: get
key: '${! json("message.document_id") }'
result_map: 'root.message.document = this'
# NOTE: If the data stored in the cache is not valid JSON then use
# something like this instead:
# result_map: 'root.message.document = content().string()'
cache_resources:
- label: foocache
memcached:
addresses: [ "TODO:11211" ]
Fields
resource
The cache resource to target with this processor.
Type: string
operator
The operation to perform with the cache.
Type: string
Options: set, add, get, delete.