Database Patterns
Common patterns for integrating databases with Expanso Edge pipelines.
Edge Database Caching
Cache cloud data locally at the edge for fast access and offline capability.
When to use:
- Reduce cloud database query costs
- Enable offline operation
- Minimize latency for frequently-accessed data
Example: Cache product catalog from cloud MySQL to local SQLite:
# Sync product data from cloud to edge
input:
sql_select:
driver: mysql
dsn: user:pass@tcp(cloud-mysql.example.com:3306)/products_db
table: products
columns: ['id', 'name', 'price', 'stock', 'updated_at']
where: updated_at > ?
args_mapping: 'root = [ meta("last_sync").or(now().ts_sub("24h")) ]'
pipeline:
processors:
- mapping: |
root.product_id = this.id
root.name = this.name
root.price_cents = (this.price * 100).round()
root.in_stock = this.stock > 0
root.synced_at = now()
output:
broker:
pattern: fan_out
outputs:
# Write to local SQLite cache
- sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/cache.db?_journal_mode=WAL'
table: products
columns: ['product_id', 'name', 'price_cents', 'in_stock', 'synced_at']
args_mapping: 'root = [ this.product_id, this.name, this.price_cents, this.in_stock, this.synced_at ]'
suffix: 'ON CONFLICT(product_id) DO UPDATE SET name=excluded.name, price_cents=excluded.price_cents, in_stock=excluded.in_stock, synced_at=excluded.synced_at'
init_statement: |
CREATE TABLE IF NOT EXISTS products (
product_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
price_cents INTEGER NOT NULL,
in_stock BOOLEAN NOT NULL,
synced_at DATETIME NOT NULL
);
batching:
count: 100
period: 5s
# Track sync time
- mapping: 'meta last_sync = this.synced_at'
Query the cache:
input:
http_server:
path: /api/products/:id
pipeline:
processors:
- mapping: 'root.product_id = meta("http_server_request_path").re_replace("/api/products/", "")'
- sql_select:
driver: sqlite
dsn: 'file:///var/lib/expanso/cache.db?mode=ro'
table: products
columns: ['product_id', 'name', 'price_cents', 'in_stock']
where: product_id = ?
args_mapping: 'root = [ this.product_id ]'
result_codec: json
- mapping: |
root = if this.length() > 0 {
this.index(0).assign({"price": this.index(0).price_cents / 100})
} else {
{"error": "Product not found"}
}
output:
sync_response: {}
Database-to-Database Replication
Replicate data between databases for multi-cloud, backup, or analytics.
When to use:
- Multi-cloud deployments
- Disaster recovery
- Separate operational and analytical databases
- Data migration
Example: Replicate orders from PostgreSQL to MySQL:
input:
sql_select:
driver: postgres
dsn: postgres://user:pass@source-db:5432/orders?sslmode=require
table: orders
columns: ['order_id', 'customer_id', 'amount', 'status', 'created_at']
where: created_at > $1 AND replicated = false
args_mapping: 'root = [ now().ts_sub("1h") ]'
output:
broker:
pattern: fan_out
outputs:
# Write to destination MySQL
- sql_insert:
driver: mysql
dsn: user:pass@tcp(dest-db:3306)/orders_replica
table: orders
columns: ['order_id', 'customer_id', 'amount', 'status', 'created_at']
args_mapping: |
root = [
this.order_id,
this.customer_id,
this.amount,
this.status,
this.created_at
]
batching:
count: 500
period: 10s
# Mark as replicated in source
- sql_raw:
driver: postgres
dsn: postgres://user:pass@source-db:5432/orders?sslmode=require
query: 'UPDATE orders SET replicated = true WHERE order_id = $1'
args_mapping: 'root = [ this.order_id ]'
Data Enrichment with Database Lookups
Enrich streaming data by querying databases for additional context.
When to use:
- Add user details to events
- Look up product information
- Join streaming data with reference tables
- Real-time data augmentation
Example: Enrich events with user and product data from different databases:
input:
kafka:
addresses: ['localhost:9092']
topics: ['user-events']
pipeline:
processors:
- mapping: 'root = this.parse_json()'
# Look up user from PostgreSQL
- sql_select:
driver: postgres
dsn: postgres://user:pass@users-db:5432/users?sslmode=disable
table: users
columns: ['name', 'email', 'tier']
where: user_id = $1
args_mapping: 'root = [ this.user_id ]'
result_codec: json
- mapping: 'root.user = if this.length() > 0 { this.index(0) } else { null }'
# Look up product from MySQL
- sql_select:
driver: mysql
dsn: user:pass@tcp(products-db:3306)/catalog
table: products
columns: ['name', 'price', 'category']
where: product_id = ?
args_mapping: 'root = [ this.product_id ]'
result_codec: json
- mapping: 'root.product = if this.length() > 0 { this.index(0) } else { null }'
# Combine enriched data
- mapping: |
root.event_id = this.event_id
root.user_name = this.user.name.or("unknown")
root.user_tier = this.user.tier.or("free")
root.product_name = this.product.name.or("unknown")
root.product_price = this.product.price.or(0)
root.timestamp = now()
output:
stdout: {}
Aggregate Before Cloud Insert
Reduce cloud database costs and load by aggregating data at the edge.
When to use:
- High-volume metrics or events
- Reduce cloud database write costs
- Summarize time-series data
- Pre-compute analytics
Example: Aggregate 5-minute metrics before writing to cloud:
input:
kafka:
addresses: ['localhost:9092']
topics: ['metrics']
buffer:
system_window:
timestamp_mapping: 'root = this.timestamp.ts_parse("2006-01-02T15:04:05Z")'
size: 5m
pipeline:
processors:
- group_by_value:
value: '${! json("metric_name") }'
- mapping: |
root.metric_name = @group_key
root.min_value = this.map_each(v -> v.value).min()
root.max_value = this.map_each(v -> v.value).max()
root.avg_value = this.map_each(v -> v.value).average()
root.count = this.length()
root.window_start = @window_start_timestamp
root.window_end = @window_end_timestamp
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@cloud-metrics:5432/metrics?sslmode=require
table: metric_aggregates
columns: ['metric_name', 'min_value', 'max_value', 'avg_value', 'count', 'window_start', 'window_end']
args_mapping: |
root = [
this.metric_name,
this.min_value,
this.max_value,
this.avg_value,
this.count,
this.window_start,
this.window_end
]
batching:
count: 50
period: 30s
Impact: Reduces database writes by 95%+ (e.g., 10,000 individual metrics → 200 aggregated records per 5-minute window).
Transaction-Safe Batch Inserts
Use batching for better performance while maintaining transactional safety.
When to use:
- High-throughput inserts
- Need all-or-nothing semantics
- Reduce database load
Example:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['event_id', 'user_id', 'event_type', 'timestamp']
args_mapping: 'root = [ this.id, this.user_id, this.type, now() ]'
batching:
count: 1000 # Insert 1000 rows in single transaction
period: 10s # Or every 10 seconds
Performance: 10-100x faster than individual inserts.
Offline Queueing
Buffer data locally when cloud connectivity is unavailable.
When to use:
- Unreliable network connectivity
- Ensure no data loss
- Handle cloud outages gracefully
Example: Queue to SQLite when cloud PostgreSQL is unavailable:
input:
http_server:
path: /sensor-data
output:
fallback:
# Try cloud database first
- sql_insert:
driver: postgres
dsn: postgres://user:pass@cloud-db:5432/iot?sslmode=require
table: sensor_readings
columns: ['sensor_id', 'temperature', 'timestamp']
args_mapping: 'root = [ this.sensor_id, this.temperature, now() ]'
# Fall back to local SQLite queue
- sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/queue.db?_journal_mode=WAL'
table: pending_uploads
columns: ['sensor_id', 'temperature', 'timestamp', 'queued_at']
args_mapping: 'root = [ this.sensor_id, this.temperature, now(), now() ]'
init_statement: |
CREATE TABLE IF NOT EXISTS pending_uploads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sensor_id TEXT NOT NULL,
temperature REAL NOT NULL,
timestamp DATETIME NOT NULL,
queued_at DATETIME NOT NULL,
uploaded BOOLEAN DEFAULT 0
);
Then process the queue when connectivity returns:
input:
sql_select:
driver: sqlite
dsn: 'file:///var/lib/expanso/queue.db'
table: pending_uploads
columns: ['id', 'sensor_id', 'temperature', 'timestamp']
where: uploaded = 0
order_by: id ASC
output:
broker:
pattern: fan_out
outputs:
# Upload to cloud
- sql_insert:
driver: postgres
dsn: postgres://user:pass@cloud-db:5432/iot?sslmode=require
table: sensor_readings
columns: ['sensor_id', 'temperature', 'timestamp']
args_mapping: 'root = [ this.sensor_id, this.temperature, this.timestamp ]'
batching:
count: 100
period: 5s
# Mark as uploaded
- sql_raw:
driver: sqlite
dsn: 'file:///var/lib/expanso/queue.db?_journal_mode=WAL'
query: 'UPDATE pending_uploads SET uploaded = 1 WHERE id = ?'
args_mapping: 'root = [ this.id ]'
Next Steps
- Best Practices - Performance tips and optimization
- Database Components - Full component reference
- MySQL Guide - MySQL-specific features
- PostgreSQL Guide - PostgreSQL-specific features
- SQLite Guide - SQLite edge patterns