Skip to main content

Database Patterns

Common patterns for integrating databases with Expanso Edge pipelines.

Edge Database Caching

Cache cloud data locally at the edge for fast access and offline capability.

When to use:

  • Reduce cloud database query costs
  • Enable offline operation
  • Minimize latency for frequently-accessed data

Example: Cache product catalog from cloud MySQL to local SQLite:

# Sync product data from cloud to edge
input:
sql_select:
driver: mysql
dsn: user:pass@tcp(cloud-mysql.example.com:3306)/products_db
table: products
columns: ['id', 'name', 'price', 'stock', 'updated_at']
where: updated_at > ?
args_mapping: 'root = [ meta("last_sync").or(now().ts_sub("24h")) ]'

pipeline:
processors:
- mapping: |
root.product_id = this.id
root.name = this.name
root.price_cents = (this.price * 100).round()
root.in_stock = this.stock > 0
root.synced_at = now()

output:
broker:
pattern: fan_out
outputs:
# Write to local SQLite cache
- sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/cache.db?_journal_mode=WAL'
table: products
columns: ['product_id', 'name', 'price_cents', 'in_stock', 'synced_at']
args_mapping: 'root = [ this.product_id, this.name, this.price_cents, this.in_stock, this.synced_at ]'
suffix: 'ON CONFLICT(product_id) DO UPDATE SET name=excluded.name, price_cents=excluded.price_cents, in_stock=excluded.in_stock, synced_at=excluded.synced_at'
init_statement: |
CREATE TABLE IF NOT EXISTS products (
product_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
price_cents INTEGER NOT NULL,
in_stock BOOLEAN NOT NULL,
synced_at DATETIME NOT NULL
);
batching:
count: 100
period: 5s

# Track sync time
- mapping: 'meta last_sync = this.synced_at'

Query the cache:

input:
http_server:
path: /api/products/:id

pipeline:
processors:
- mapping: 'root.product_id = meta("http_server_request_path").re_replace("/api/products/", "")'

- sql_select:
driver: sqlite
dsn: 'file:///var/lib/expanso/cache.db?mode=ro'
table: products
columns: ['product_id', 'name', 'price_cents', 'in_stock']
where: product_id = ?
args_mapping: 'root = [ this.product_id ]'
result_codec: json

- mapping: |
root = if this.length() > 0 {
this.index(0).assign({"price": this.index(0).price_cents / 100})
} else {
{"error": "Product not found"}
}

output:
sync_response: {}

Database-to-Database Replication

Replicate data between databases for multi-cloud, backup, or analytics.

When to use:

  • Multi-cloud deployments
  • Disaster recovery
  • Separate operational and analytical databases
  • Data migration

Example: Replicate orders from PostgreSQL to MySQL:

input:
sql_select:
driver: postgres
dsn: postgres://user:pass@source-db:5432/orders?sslmode=require
table: orders
columns: ['order_id', 'customer_id', 'amount', 'status', 'created_at']
where: created_at > $1 AND replicated = false
args_mapping: 'root = [ now().ts_sub("1h") ]'

output:
broker:
pattern: fan_out
outputs:
# Write to destination MySQL
- sql_insert:
driver: mysql
dsn: user:pass@tcp(dest-db:3306)/orders_replica
table: orders
columns: ['order_id', 'customer_id', 'amount', 'status', 'created_at']
args_mapping: |
root = [
this.order_id,
this.customer_id,
this.amount,
this.status,
this.created_at
]
batching:
count: 500
period: 10s

# Mark as replicated in source
- sql_raw:
driver: postgres
dsn: postgres://user:pass@source-db:5432/orders?sslmode=require
query: 'UPDATE orders SET replicated = true WHERE order_id = $1'
args_mapping: 'root = [ this.order_id ]'

Data Enrichment with Database Lookups

Enrich streaming data by querying databases for additional context.

When to use:

  • Add user details to events
  • Look up product information
  • Join streaming data with reference tables
  • Real-time data augmentation

Example: Enrich events with user and product data from different databases:

input:
kafka:
addresses: ['localhost:9092']
topics: ['user-events']

pipeline:
processors:
- mapping: 'root = this.parse_json()'

# Look up user from PostgreSQL
- sql_select:
driver: postgres
dsn: postgres://user:pass@users-db:5432/users?sslmode=disable
table: users
columns: ['name', 'email', 'tier']
where: user_id = $1
args_mapping: 'root = [ this.user_id ]'
result_codec: json
- mapping: 'root.user = if this.length() > 0 { this.index(0) } else { null }'

# Look up product from MySQL
- sql_select:
driver: mysql
dsn: user:pass@tcp(products-db:3306)/catalog
table: products
columns: ['name', 'price', 'category']
where: product_id = ?
args_mapping: 'root = [ this.product_id ]'
result_codec: json
- mapping: 'root.product = if this.length() > 0 { this.index(0) } else { null }'

# Combine enriched data
- mapping: |
root.event_id = this.event_id
root.user_name = this.user.name.or("unknown")
root.user_tier = this.user.tier.or("free")
root.product_name = this.product.name.or("unknown")
root.product_price = this.product.price.or(0)
root.timestamp = now()

output:
stdout: {}

Aggregate Before Cloud Insert

Reduce cloud database costs and load by aggregating data at the edge.

When to use:

  • High-volume metrics or events
  • Reduce cloud database write costs
  • Summarize time-series data
  • Pre-compute analytics

Example: Aggregate 5-minute metrics before writing to cloud:

input:
kafka:
addresses: ['localhost:9092']
topics: ['metrics']

buffer:
system_window:
timestamp_mapping: 'root = this.timestamp.ts_parse("2006-01-02T15:04:05Z")'
size: 5m

pipeline:
processors:
- group_by_value:
value: '${! json("metric_name") }'

- mapping: |
root.metric_name = @group_key
root.min_value = this.map_each(v -> v.value).min()
root.max_value = this.map_each(v -> v.value).max()
root.avg_value = this.map_each(v -> v.value).average()
root.count = this.length()
root.window_start = @window_start_timestamp
root.window_end = @window_end_timestamp

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@cloud-metrics:5432/metrics?sslmode=require
table: metric_aggregates
columns: ['metric_name', 'min_value', 'max_value', 'avg_value', 'count', 'window_start', 'window_end']
args_mapping: |
root = [
this.metric_name,
this.min_value,
this.max_value,
this.avg_value,
this.count,
this.window_start,
this.window_end
]
batching:
count: 50
period: 30s

Impact: Reduces database writes by 95%+ (e.g., 10,000 individual metrics → 200 aggregated records per 5-minute window).


Transaction-Safe Batch Inserts

Use batching for better performance while maintaining transactional safety.

When to use:

  • High-throughput inserts
  • Need all-or-nothing semantics
  • Reduce database load

Example:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['event_id', 'user_id', 'event_type', 'timestamp']
args_mapping: 'root = [ this.id, this.user_id, this.type, now() ]'
batching:
count: 1000 # Insert 1000 rows in single transaction
period: 10s # Or every 10 seconds

Performance: 10-100x faster than individual inserts.


Offline Queueing

Buffer data locally when cloud connectivity is unavailable.

When to use:

  • Unreliable network connectivity
  • Ensure no data loss
  • Handle cloud outages gracefully

Example: Queue to SQLite when cloud PostgreSQL is unavailable:

input:
http_server:
path: /sensor-data

output:
fallback:
# Try cloud database first
- sql_insert:
driver: postgres
dsn: postgres://user:pass@cloud-db:5432/iot?sslmode=require
table: sensor_readings
columns: ['sensor_id', 'temperature', 'timestamp']
args_mapping: 'root = [ this.sensor_id, this.temperature, now() ]'

# Fall back to local SQLite queue
- sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/queue.db?_journal_mode=WAL'
table: pending_uploads
columns: ['sensor_id', 'temperature', 'timestamp', 'queued_at']
args_mapping: 'root = [ this.sensor_id, this.temperature, now(), now() ]'
init_statement: |
CREATE TABLE IF NOT EXISTS pending_uploads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sensor_id TEXT NOT NULL,
temperature REAL NOT NULL,
timestamp DATETIME NOT NULL,
queued_at DATETIME NOT NULL,
uploaded BOOLEAN DEFAULT 0
);

Then process the queue when connectivity returns:

input:
sql_select:
driver: sqlite
dsn: 'file:///var/lib/expanso/queue.db'
table: pending_uploads
columns: ['id', 'sensor_id', 'temperature', 'timestamp']
where: uploaded = 0
order_by: id ASC

output:
broker:
pattern: fan_out
outputs:
# Upload to cloud
- sql_insert:
driver: postgres
dsn: postgres://user:pass@cloud-db:5432/iot?sslmode=require
table: sensor_readings
columns: ['sensor_id', 'temperature', 'timestamp']
args_mapping: 'root = [ this.sensor_id, this.temperature, this.timestamp ]'
batching:
count: 100
period: 5s

# Mark as uploaded
- sql_raw:
driver: sqlite
dsn: 'file:///var/lib/expanso/queue.db?_journal_mode=WAL'
query: 'UPDATE pending_uploads SET uploaded = 1 WHERE id = ?'
args_mapping: 'root = [ this.id ]'

Next Steps