Skip to main content

Pipeline Configuration

Expanso Edge pipelines are configured in a YAML file that consists of a number of root sections, arranged like so:

input:
kafka:
addresses: [ localhost:9092 ]
topics: [ foo, bar ]
consumer_group: foogroup

pipeline:
processors:
- mapping: |
root.message = this
root.meta.link_count = this.links.length()

output:
aws_s3:
bucket: my-data-bucket
path: '${! metadata("kafka_topic") }/${! json("message.id") }.json'

Most sections represent a component type, which you can read about in more detail in the component documentation.

These types are hierarchical. For example, an input can have a list of child processor types attached to it, which in turn can have their own processor children.

This is powerful but can potentially lead to large and cumbersome configuration files. This document outlines features and patterns to help with writing and managing complex pipeline configurations.

Customising Your Configuration

Sometimes it's useful to write a configuration where certain fields can be defined during deployment. For this purpose Expanso Edge supports environment variable interpolation, allowing you to set fields in your config with environment variables like so:

input:
kafka:
addresses:
- ${KAFKA_BROKER:localhost:9092}
topics:
- ${KAFKA_TOPIC:default-topic}

This is very useful for sharing configuration files across different deployment environments.

Reusing Configuration Snippets

Sometimes it's necessary to use a rather large component multiple times. Instead of copy/pasting the configuration or using YAML anchors you can define your component as a resource.

In the following example we want to make an HTTP request with our payloads. Occasionally the payload might get rejected due to garbage within its contents, and so we catch these rejected requests, attempt to "cleanse" the contents and try to make the same HTTP request again. Since the HTTP request component is quite large (and likely to change over time) we make sure to avoid duplicating it by defining it as a resource get_foo:

pipeline:
processors:
- resource: get_foo
- catch:
- mapping: |
root = this
root.content = this.content.strip_html()
- resource: get_foo

processor_resources:
- label: get_foo
http:
url: http://example.com/foo
verb: POST
headers:
SomeThing: "set-to-this"
SomeThingElse: "set-to-something-else"

You can find out more about configuration resources in the resources document.

Templating

Resources can only be instantiated with a single configuration, which means they aren't suitable for cases where the configuration is required in multiple places but with slightly different parameters.

Expanso Edge supports templates, with which it's possible to define a custom configuration schema and a template for building a configuration from that schema. You can read more about templates in this guide.

Configuration Structure

A pipeline configuration consists of these main sections:

Core Components

  • input: Where data comes from (inputs)
  • pipeline: How data is transformed (processors)
  • output: Where data goes (outputs)
  • buffer: Optional buffering between input and output (buffers)

Resources

Resources are reusable components that can be referenced throughout your configuration:

  • input_resources: Reusable input definitions
  • cache_resources: Cache components for lookups and state (caches)
  • processor_resources: Reusable processor definitions
  • rate_limit_resources: Rate limiting components (rate limits)
  • output_resources: Reusable output definitions

Advanced Features

  • Environment interpolation: Use ${ENV_VAR} syntax
  • Bloblang queries: Use ${! bloblang } for dynamic values
  • Metadata: Access and manipulate message metadata
  • Error handling: Catch and handle errors gracefully

Pipeline Patterns

Simple Pipeline

The most basic pipeline: input → process → output

input:
file:
paths: [ /data/logs/*.log ]

pipeline:
processors:
- mapping: 'root = content().parse_json()'

output:
kafka:
addresses: [ localhost:9092 ]
topic: processed_logs

Pipeline with Resources

Reuse complex components:

input:
file:
paths: [ /data/*.json ]

pipeline:
processors:
- resource: enrich_data
- resource: validate_schema

processor_resources:
- label: enrich_data
branch:
processors:
- cache:
resource: user_cache
operator: get
key: ${! json("user_id") }
result_map: 'root.user = this'

- label: validate_schema
json_schema:
schema_path: /schemas/output.json

cache_resources:
- label: user_cache
redis:
url: redis://localhost:6379

output:
aws_s3:
bucket: processed-data
path: ${! timestamp_unix() }.json

Pipeline with Buffering

Handle backpressure:

input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]

buffer:
memory:
limit: 10000

pipeline:
processors:
- mapping: 'root.processed_at = now()'

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost/db
table: events

Configuration Best Practices

  1. Start simple: Begin with minimal configuration and add complexity as needed
  2. Use resources: Extract repeated components into resources
  3. Environment variables: Make configs portable across environments
  4. Add comments: YAML comments document your pipeline logic
  5. Version control: Store configurations in git
  6. Test incrementally: Validate each processor step-by-step

Next Steps