Inputs
An input is a source of data piped through an array of optional processors:
input:
label: my_redis_input
redis_streams:
url: tcp://localhost:6379
streams:
- expanso_stream
body_key: body
consumer_group: expanso_group
# Optional list of processing steps
processors:
- mapping: |
root.document = this.without("links")
root.link_count = this.links.length()
Some inputs have a logical end, for example a csv input ends once the last row is consumed, when this happens the input gracefully terminates and Expanso Edge will shut itself down once all messages have been processed fully.
Brokering
Only one input is configured at the root of a pipeline config. However, the root input can be a broker which combines multiple inputs and merges the streams:
input:
broker:
inputs:
- kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
- redis_streams:
url: tcp://localhost:6379
streams:
- expanso_stream
body_key: body
consumer_group: expanso_group
Labels
Inputs have an optional field label that can uniquely identify them in observability data such as metrics and logs. This can be useful when running configs with multiple inputs, otherwise their metrics labels will be generated based on their composition.
Database Inputs
Query databases to read data into your pipeline:
- sql_select: Query database tables (MySQL, PostgreSQL, SQLite, etc.)
- sql_raw: Execute custom SQL queries
- mongodb: Query MongoDB collections
- cassandra: Query Cassandra tables
See the Database Connectivity Guide for complete examples including:
- Reading from PostgreSQL and MySQL
- Using SQLite for edge caching and analytics
- Database-to-database replication
- Cloud database authentication (AWS RDS, Azure)