Database Components
Expanso Edge provides three SQL components for reading and writing database data. This page provides a quick reference with common configurations.
Reading Data
sql_select Input
Query a database table and create messages from rows. Best for simple SELECT queries on a single table.
Basic usage:
input:
sql_select:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: users
columns: ['id', 'name', 'email']
where: created_at > ?
args_mapping: 'root = [ now().ts_sub("24h") ]'
Key parameters:
driver: Database driver (mysql,postgres,sqlite, etc.)dsn: Connection string (Data Source Name)table: Table name to querycolumns: Columns to select (use['*']for all columns)where: Optional WHERE clause with?placeholdersargs_mapping: Bloblang mapping that returns array of arguments for?placeholders
sql_raw Input
Execute arbitrary SQL queries including joins, subqueries, and complex SELECT statements.
Basic usage:
input:
sql_raw:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
query: |
SELECT u.id, u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > ?
GROUP BY u.id, u.name
args_mapping: 'root = [ now().ts_sub("7d") ]'
Key parameters:
query: Complete SQL query with?placeholdersargs_mapping: Bloblang mapping that returns array of arguments
sql_select Processor
Enrich messages by querying a database. Useful for lookups and data enrichment.
Basic usage:
pipeline:
processors:
- sql_select:
driver: sqlite
dsn: 'file:///var/lib/reference.db'
table: product_catalog
columns: ['product_name', 'category', 'price']
where: product_id = ?
args_mapping: 'root = [ this.product_id ]'
result_codec: json
Key parameters:
result_codec: How to structure results (jsonrecommended)- All other parameters same as
sql_selectinput
Result handling:
The processor replaces the message content with query results as an array:
[
{"product_name": "Widget", "category": "Hardware", "price": 19.99}
]
Use a mapping processor after to extract results:
pipeline:
processors:
- sql_select:
# ... query config ...
result_codec: json
- mapping: |
# Extract first result or null if no results
root.product = if this.length() > 0 { this.index(0) } else { null }
Writing Data
sql_insert Output
Insert messages as rows into a database table. Supports batching for performance.
Basic usage:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: events
columns: ['user_id', 'event_type', 'timestamp']
args_mapping: |
root = [
this.user_id,
this.event_type,
now()
]
With batching:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: events
columns: ['user_id', 'event_type', 'timestamp']
args_mapping: 'root = [ this.user_id, this.event_type, now() ]'
batching:
count: 100 # Insert 100 rows at once
period: 5s # Or every 5 seconds
Key parameters:
table: Table namecolumns: Column names in orderargs_mapping: Bloblang mapping that returns array of values matching columnsbatching: Optional batching configuration for better performancesuffix: Optional SQL suffix (e.g.,ON CONFLICTclause for upserts)init_statement: Optional SQL to run on startup (e.g., CREATE TABLE)
sql_raw Output
Execute arbitrary SQL commands including INSERT, UPDATE, DELETE, and upserts.
Basic usage:
output:
sql_raw:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
query: |
INSERT INTO user_stats (user_id, login_count, last_login)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
login_count = login_count + 1,
last_login = VALUES(last_login)
args_mapping: |
root = [
this.user_id,
1,
now()
]
Key parameters:
query: Complete SQL statement with?placeholdersargs_mapping: Bloblang mapping that returns array of arguments
Connection Management
Connection Pooling
Configure connection pools for better performance:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: events
columns: ['data']
args_mapping: 'root = [ this.encode("json") ]'
conn_max_open: 20 # Max 20 open connections
conn_max_idle: 5 # Keep 5 idle connections ready
conn_max_idle_time: 5m # Close idle connections after 5 minutes
conn_max_life_time: 30m # Recycle connections after 30 minutes
Connection Verification
Verify database connectivity on startup:
input:
sql_select:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
table: users
columns: ['*']
init_verify_conn: true # Ping database on startup
Database Initialization
Auto-Create Tables
Use init_statement to create tables on startup:
output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/edge.db'
table: sensor_data
columns: ['device_id', 'temperature', 'humidity', 'timestamp']
args_mapping: 'root = [ this.device_id, this.temp, this.humidity, now() ]'
init_statement: |
CREATE TABLE IF NOT EXISTS sensor_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
temperature REAL NOT NULL,
humidity REAL NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_device_timestamp
ON sensor_data (device_id, timestamp);
Run Initialization Scripts
Load SQL files on startup:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: events
columns: ['event_type', 'payload']
args_mapping: 'root = [ this.type, this.data.encode("json") ]'
init_files:
- /etc/expanso/schema/001_create_tables.sql
- /etc/expanso/schema/002_create_indexes.sql
Cloud Authentication
AWS RDS with IAM
Connect to AWS RDS using IAM authentication:
output:
sql_insert:
driver: postgres
dsn: postgres://iamuser@my-rds-instance.region.rds.amazonaws.com:5432/mydb
table: logs
columns: ['message', 'level']
args_mapping: 'root = [ this.msg, this.level ]'
iam_enabled: true
region: us-east-1
AWS Secrets Manager
Store credentials securely in AWS Secrets Manager:
output:
sql_insert:
driver: mysql
dsn: tcp(my-rds-instance.region.rds.amazonaws.com:3306)/mydb
table: events
columns: ['data']
args_mapping: 'root = [ this.encode("json") ]'
secret_name: prod/mysql/credentials
region: us-east-1
Azure AD Authentication
Use Azure AD authentication with Azure Database for PostgreSQL:
input:
sql_select:
driver: postgres
dsn: postgres://[email protected]:5432/db?sslmode=require
table: users
columns: ['id', 'email']
azure:
entra_enabled: true
token_request_options:
tenant_id: "your-tenant-id"
Next Steps
- Common Patterns - Real-world examples for caching, replication, enrichment, and aggregation
- Best Practices - Performance tips, troubleshooting, and optimization
- Bloblang Guide - Learn data transformation for
args_mapping