Pipeline Configuration
Expanso Edge pipelines are configured in a YAML file that consists of a number of root sections, arranged like so:
- Common
- Full
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ foo, bar ]
consumer_group: foogroup
pipeline:
processors:
- mapping: |
root.message = this
root.meta.link_count = this.links.length()
output:
aws_s3:
bucket: my-data-bucket
path: '${! metadata("kafka_topic") }/${! json("message.id") }.json'
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ foo, bar ]
consumer_group: foogroup
buffer:
none: {}
pipeline:
processors:
- mapping: |
root.message = this
root.meta.link_count = this.links.length()
output:
aws_s3:
bucket: my-data-bucket
path: '${! metadata("kafka_topic") }/${! json("message.id") }.json'
input_resources: []
cache_resources: []
processor_resources: []
rate_limit_resources: []
output_resources: []
Most sections represent a component type, which you can read about in more detail in the component documentation.
These types are hierarchical. For example, an input can have a list of child processor types attached to it, which in turn can have their own processor children.
This is powerful but can potentially lead to large and cumbersome configuration files. This document outlines features and patterns to help with writing and managing complex pipeline configurations.
Customising Your Configuration
Sometimes it's useful to write a configuration where certain fields can be defined during deployment. For this purpose Expanso Edge supports environment variable interpolation, allowing you to set fields in your config with environment variables like so:
input:
kafka:
addresses:
- ${KAFKA_BROKER:localhost:9092}
topics:
- ${KAFKA_TOPIC:default-topic}
This is very useful for sharing configuration files across different deployment environments.
Reusing Configuration Snippets
Sometimes it's necessary to use a rather large component multiple times. Instead of copy/pasting the configuration or using YAML anchors you can define your component as a resource.
In the following example we want to make an HTTP request with our payloads. Occasionally the payload might get rejected due to garbage within its contents, and so we catch these rejected requests, attempt to "cleanse" the contents and try to make the same HTTP request again. Since the HTTP request component is quite large (and likely to change over time) we make sure to avoid duplicating it by defining it as a resource get_foo:
pipeline:
processors:
- resource: get_foo
- catch:
- mapping: |
root = this
root.content = this.content.strip_html()
- resource: get_foo
processor_resources:
- label: get_foo
http:
url: http://example.com/foo
verb: POST
headers:
SomeThing: "set-to-this"
SomeThingElse: "set-to-something-else"
You can find out more about configuration resources in the resources document.
Templating
Resources can only be instantiated with a single configuration, which means they aren't suitable for cases where the configuration is required in multiple places but with slightly different parameters.
Expanso Edge supports templates, with which it's possible to define a custom configuration schema and a template for building a configuration from that schema. You can read more about templates in this guide.
Configuration Structure
A pipeline configuration consists of these main sections:
Core Components
- input: Where data comes from (inputs)
- pipeline: How data is transformed (processors)
- output: Where data goes (outputs)
- buffer: Optional buffering between input and output (buffers)
Resources
Resources are reusable components that can be referenced throughout your configuration:
- input_resources: Reusable input definitions
- cache_resources: Cache components for lookups and state (caches)
- processor_resources: Reusable processor definitions
- rate_limit_resources: Rate limiting components (rate limits)
- output_resources: Reusable output definitions
Advanced Features
- Environment interpolation: Use
${ENV_VAR}syntax - Bloblang queries: Use
${! bloblang }for dynamic values - Metadata: Access and manipulate message metadata
- Error handling: Catch and handle errors gracefully
Pipeline Patterns
Simple Pipeline
The most basic pipeline: input → process → output
input:
file:
paths: [ /data/logs/*.log ]
pipeline:
processors:
- mapping: 'root = content().parse_json()'
output:
kafka:
addresses: [ localhost:9092 ]
topic: processed_logs
Pipeline with Resources
Reuse complex components:
input:
file:
paths: [ /data/*.json ]
pipeline:
processors:
- resource: enrich_data
- resource: validate_schema
processor_resources:
- label: enrich_data
branch:
processors:
- cache:
resource: user_cache
operator: get
key: ${! json("user_id") }
result_map: 'root.user = this'
- label: validate_schema
json_schema:
schema_path: /schemas/output.json
cache_resources:
- label: user_cache
redis:
url: redis://localhost:6379
output:
aws_s3:
bucket: processed-data
path: ${! timestamp_unix() }.json
Pipeline with Buffering
Handle backpressure:
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
buffer:
memory:
limit: 10000
pipeline:
processors:
- mapping: 'root.processed_at = now()'
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost/db
table: events
Configuration Best Practices
- Start simple: Begin with minimal configuration and add complexity as needed
- Use resources: Extract repeated components into resources
- Environment variables: Make configs portable across environments
- Add comments: YAML comments document your pipeline logic
- Version control: Store configurations in git
- Test incrementally: Validate each processor step-by-step
Next Steps
- Resources: Learn about reusable components
- Interpolation: Dynamic configuration with environment variables
- Batching: Optimize throughput with message batching
- Error Handling: Handle failures gracefully
- Metadata: Work with message metadata
- Templating: Create configuration templates