Skip to main content

Database Components

Expanso Edge provides three SQL components for reading and writing database data. This page provides a quick reference with common configurations.

Reading Data

sql_select Input

Query a database table and create messages from rows. Best for simple SELECT queries on a single table.

Basic usage:

input:
sql_select:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: users
columns: ['id', 'name', 'email']
where: created_at > ?
args_mapping: 'root = [ now().ts_sub("24h") ]'

Key parameters:

  • driver: Database driver (mysql, postgres, sqlite, etc.)
  • dsn: Connection string (Data Source Name)
  • table: Table name to query
  • columns: Columns to select (use ['*'] for all columns)
  • where: Optional WHERE clause with ? placeholders
  • args_mapping: Bloblang mapping that returns array of arguments for ? placeholders

Full documentation →


sql_raw Input

Execute arbitrary SQL queries including joins, subqueries, and complex SELECT statements.

Basic usage:

input:
sql_raw:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
query: |
SELECT u.id, u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > ?
GROUP BY u.id, u.name
args_mapping: 'root = [ now().ts_sub("7d") ]'

Key parameters:

  • query: Complete SQL query with ? placeholders
  • args_mapping: Bloblang mapping that returns array of arguments

Full documentation →


sql_select Processor

Enrich messages by querying a database. Useful for lookups and data enrichment.

Basic usage:

pipeline:
processors:
- sql_select:
driver: sqlite
dsn: 'file:///var/lib/reference.db'
table: product_catalog
columns: ['product_name', 'category', 'price']
where: product_id = ?
args_mapping: 'root = [ this.product_id ]'
result_codec: json

Key parameters:

  • result_codec: How to structure results (json recommended)
  • All other parameters same as sql_select input

Result handling:

The processor replaces the message content with query results as an array:

[
{"product_name": "Widget", "category": "Hardware", "price": 19.99}
]

Use a mapping processor after to extract results:

pipeline:
processors:
- sql_select:
# ... query config ...
result_codec: json

- mapping: |
# Extract first result or null if no results
root.product = if this.length() > 0 { this.index(0) } else { null }

Full documentation →


Writing Data

sql_insert Output

Insert messages as rows into a database table. Supports batching for performance.

Basic usage:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: events
columns: ['user_id', 'event_type', 'timestamp']
args_mapping: |
root = [
this.user_id,
this.event_type,
now()
]

With batching:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: events
columns: ['user_id', 'event_type', 'timestamp']
args_mapping: 'root = [ this.user_id, this.event_type, now() ]'
batching:
count: 100 # Insert 100 rows at once
period: 5s # Or every 5 seconds

Key parameters:

  • table: Table name
  • columns: Column names in order
  • args_mapping: Bloblang mapping that returns array of values matching columns
  • batching: Optional batching configuration for better performance
  • suffix: Optional SQL suffix (e.g., ON CONFLICT clause for upserts)
  • init_statement: Optional SQL to run on startup (e.g., CREATE TABLE)

Full documentation →


sql_raw Output

Execute arbitrary SQL commands including INSERT, UPDATE, DELETE, and upserts.

Basic usage:

output:
sql_raw:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
query: |
INSERT INTO user_stats (user_id, login_count, last_login)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
login_count = login_count + 1,
last_login = VALUES(last_login)
args_mapping: |
root = [
this.user_id,
1,
now()
]

Key parameters:

  • query: Complete SQL statement with ? placeholders
  • args_mapping: Bloblang mapping that returns array of arguments

Full documentation →


Connection Management

Connection Pooling

Configure connection pools for better performance:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: events
columns: ['data']
args_mapping: 'root = [ this.encode("json") ]'
conn_max_open: 20 # Max 20 open connections
conn_max_idle: 5 # Keep 5 idle connections ready
conn_max_idle_time: 5m # Close idle connections after 5 minutes
conn_max_life_time: 30m # Recycle connections after 30 minutes

Connection Verification

Verify database connectivity on startup:

input:
sql_select:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
table: users
columns: ['*']
init_verify_conn: true # Ping database on startup

Database Initialization

Auto-Create Tables

Use init_statement to create tables on startup:

output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/edge.db'
table: sensor_data
columns: ['device_id', 'temperature', 'humidity', 'timestamp']
args_mapping: 'root = [ this.device_id, this.temp, this.humidity, now() ]'
init_statement: |
CREATE TABLE IF NOT EXISTS sensor_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
temperature REAL NOT NULL,
humidity REAL NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_device_timestamp
ON sensor_data (device_id, timestamp);

Run Initialization Scripts

Load SQL files on startup:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db
table: events
columns: ['event_type', 'payload']
args_mapping: 'root = [ this.type, this.data.encode("json") ]'
init_files:
- /etc/expanso/schema/001_create_tables.sql
- /etc/expanso/schema/002_create_indexes.sql

Cloud Authentication

AWS RDS with IAM

Connect to AWS RDS using IAM authentication:

output:
sql_insert:
driver: postgres
dsn: postgres://iamuser@my-rds-instance.region.rds.amazonaws.com:5432/mydb
table: logs
columns: ['message', 'level']
args_mapping: 'root = [ this.msg, this.level ]'
iam_enabled: true
region: us-east-1

AWS Secrets Manager

Store credentials securely in AWS Secrets Manager:

output:
sql_insert:
driver: mysql
dsn: tcp(my-rds-instance.region.rds.amazonaws.com:3306)/mydb
table: events
columns: ['data']
args_mapping: 'root = [ this.encode("json") ]'
secret_name: prod/mysql/credentials
region: us-east-1

Azure AD Authentication

Use Azure AD authentication with Azure Database for PostgreSQL:

input:
sql_select:
driver: postgres
dsn: postgres://[email protected]:5432/db?sslmode=require
table: users
columns: ['id', 'email']
azure:
entra_enabled: true
token_request_options:
tenant_id: "your-tenant-id"

Next Steps

  • Common Patterns - Real-world examples for caching, replication, enrichment, and aggregation
  • Best Practices - Performance tips, troubleshooting, and optimization
  • Bloblang Guide - Learn data transformation for args_mapping