SQLite with Expanso Edge
SQLite is the perfect database for edge deployments. It's embedded (no separate process), file-based, lightweight, and works completely offline. Use SQLite for local caching, queueing, analytics, and data buffering at the edge.
Why SQLite at the Edge?
Ideal for edge computing:
- Embedded database: No separate database server required
- Single file: Easy to backup, move, and manage
- Lightweight: Minimal memory footprint (~600KB)
- Reliable: Battle-tested and production-ready
- Offline-first: Works without network connectivity
- Fast: Sub-millisecond queries for local data
- Zero configuration: No setup or administration needed
Perfect for:
- Local data caching
- Edge analytics and aggregation
- Offline queueing and buffering
- Reference data storage
- Session and state management
Connection String (DSN)
SQLite connections use file paths:
dsn: file:///path/to/database.db
Examples:
# Absolute path
dsn: 'file:///var/lib/expanso/cache.db'
# Relative path
dsn: 'file://./local.db'
# With Write-Ahead Logging (WAL) mode
dsn: 'file:///var/lib/expanso/cache.db?_journal_mode=WAL'
# Read-only mode
dsn: 'file:///var/lib/expanso/cache.db?mode=ro'
# In-memory database (for testing)
dsn: 'file::memory:?cache=shared'
Add ?_journal_mode=WAL to your DSN for better concurrent performance:
dsn: 'file:///var/lib/expanso/cache.db?_journal_mode=WAL'
WAL mode allows simultaneous reads while writing.
Reading from SQLite
Query a Table
Use sql_select to read from SQLite:
input:
sql_select:
driver: sqlite
dsn: 'file:///var/lib/expanso/cache.db?mode=ro'
table: products
columns: ['product_id', 'name', 'price', 'stock']
where: stock > ?
args_mapping: 'root = [ 0 ]'
order_by: name ASC
output:
stdout: {}
Enrich Streaming Data
Look up cached data from SQLite:
input:
kafka:
addresses: ['localhost:9092']
topics: ['orders']
pipeline:
processors:
- mapping: 'root = this.parse_json()'
# Look up product details from local SQLite cache
- sql_select:
driver: sqlite
dsn: 'file:///var/lib/expanso/product_cache.db?mode=ro'
table: products
columns: ['name', 'price', 'category']
where: product_id = ?
args_mapping: 'root = [ this.product_id ]'
result_codec: json
- mapping: |
root.product = if this.length() > 0 { this.index(0) } else { null }
root.order_id = this.order_id
root.product_name = this.product.name.or("unknown")
root.product_price = this.product.price.or(0)
output:
stdout: {}
Writing to SQLite
Insert Rows
Use sql_insert to write to SQLite:
input:
http_server:
path: /events
pipeline:
processors:
- mapping: |
root.event_id = this.id
root.event_type = this.type
root.payload = this.encode("json")
root.created_at = now()
output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/events.db?_journal_mode=WAL'
table: events
columns: ['event_id', 'event_type', 'payload', 'created_at']
args_mapping: |
root = [
this.event_id,
this.event_type,
this.payload,
this.created_at
]
init_statement: |
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id TEXT NOT NULL UNIQUE,
event_type TEXT NOT NULL,
payload TEXT,
created_at DATETIME NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_event_type ON events (event_type);
CREATE INDEX IF NOT EXISTS idx_created_at ON events (created_at);
batching:
count: 100
period: 5s
Upsert (INSERT OR REPLACE)
Handle duplicates with SQLite upsert:
output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/cache.db?_journal_mode=WAL'
table: product_cache
columns: ['product_id', 'name', 'price', 'updated_at']
args_mapping: |
root = [
this.product_id,
this.name,
this.price,
now()
]
suffix: |
ON CONFLICT(product_id) DO UPDATE SET
name = excluded.name,
price = excluded.price,
updated_at = excluded.updated_at
Common Use Cases
Edge Data Caching
Cache cloud data locally for fast access:
# Sync cloud data to edge SQLite cache
input:
http_client:
url: https://api.example.com/products
verb: GET
stream:
enabled: true
codec: json_array
pipeline:
processors:
- unarchive:
format: json_array
- mapping: |
root.product_id = this.id
root.name = this.name
root.price = this.price
root.cached_at = now()
output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/cache.db?_journal_mode=WAL'
table: product_cache
columns: ['product_id', 'name', 'price', 'cached_at']
args_mapping: 'root = [ this.product_id, this.name, this.price, this.cached_at ]'
suffix: |
ON CONFLICT(product_id) DO UPDATE SET
name = excluded.name,
price = excluded.price,
cached_at = excluded.cached_at
init_statement: |
CREATE TABLE IF NOT EXISTS product_cache (
product_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
price REAL NOT NULL,
cached_at DATETIME NOT NULL
);
batching:
count: 100
period: 5s
Offline Queueing
Buffer data locally when cloud is unavailable:
# Queue sensor data to SQLite when cloud is offline
input:
http_server:
path: /sensor-data
output:
fallback:
# Try cloud database first
- sql_insert:
driver: postgres
dsn: postgres://user:pass@cloud-db:5432/iot?sslmode=require
table: sensor_readings
columns: ['sensor_id', 'temperature', 'humidity', 'timestamp']
args_mapping: |
root = [
this.sensor_id,
this.temperature,
this.humidity,
now()
]
batching:
count: 50
period: 10s
# Fall back to local SQLite queue
- sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/queue.db?_journal_mode=WAL'
table: pending_uploads
columns: ['sensor_id', 'temperature', 'humidity', 'timestamp']
args_mapping: |
root = [
this.sensor_id,
this.temperature,
this.humidity,
now()
]
init_statement: |
CREATE TABLE IF NOT EXISTS pending_uploads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sensor_id TEXT NOT NULL,
temperature REAL NOT NULL,
humidity REAL NOT NULL,
timestamp DATETIME NOT NULL,
uploaded BOOLEAN DEFAULT 0
);
Edge Analytics
Aggregate data locally before sending to cloud:
# Collect metrics in SQLite and aggregate before cloud upload
input:
kafka:
addresses: ['localhost:9092']
topics: ['metrics']
output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/metrics.db?_journal_mode=WAL'
table: raw_metrics
columns: ['metric_name', 'value', 'timestamp']
args_mapping: 'root = [ this.metric_name, this.value, now() ]'
init_statement: |
CREATE TABLE IF NOT EXISTS raw_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
metric_name TEXT NOT NULL,
value REAL NOT NULL,
timestamp DATETIME NOT NULL,
aggregated BOOLEAN DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_metric_timestamp
ON raw_metrics (metric_name, timestamp);
batching:
count: 500
period: 5s
Then periodically aggregate and upload:
# Aggregate from SQLite and upload to cloud
input:
sql_raw:
driver: sqlite
dsn: 'file:///var/lib/expanso/metrics.db?mode=ro'
query: |
SELECT
metric_name,
MIN(value) as min_value,
MAX(value) as max_value,
AVG(value) as avg_value,
COUNT(*) as count,
MIN(timestamp) as window_start,
MAX(timestamp) as window_end
FROM raw_metrics
WHERE aggregated = 0
AND timestamp < datetime('now', '-5 minutes')
GROUP BY metric_name
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@cloud-metrics:5432/metrics?sslmode=require
table: metric_aggregates
columns: ['metric_name', 'min_value', 'max_value', 'avg_value', 'count', 'window_start', 'window_end']
args_mapping: |
root = [
this.metric_name,
this.min_value,
this.max_value,
this.avg_value,
this.count,
this.window_start,
this.window_end
]
Performance Tips
Always Use WAL Mode
Write-Ahead Logging improves concurrent performance dramatically:
dsn: 'file:///var/lib/expanso/db.db?_journal_mode=WAL'
Benefits:
- Allows simultaneous reads while writing
- Reduces write latency
- Better crash recovery
Use Batching for Writes
Batch inserts for 10-100x better performance:
output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/db.db?_journal_mode=WAL'
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
batching:
count: 1000
period: 10s
Limit Concurrent Writers
SQLite works best with a single writer:
output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/db.db?_journal_mode=WAL'
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
conn_max_open: 1 # Single writer
Use Appropriate Indexes
Create indexes for frequently queried columns:
CREATE INDEX IF NOT EXISTS idx_created_at ON events (created_at);
CREATE INDEX IF NOT EXISTS idx_event_type ON events (event_type);
VACUUM Periodically
Reclaim space from deleted records (run manually or via cron):
VACUUM;
Database Initialization
Auto-Create Tables and Indexes
Always use init_statement to ensure schema exists:
output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/db.db?_journal_mode=WAL'
table: sensor_data
columns: ['device_id', 'temperature', 'humidity', 'timestamp']
args_mapping: 'root = [ this.device_id, this.temp, this.humidity, now() ]'
init_statement: |
CREATE TABLE IF NOT EXISTS sensor_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
temperature REAL NOT NULL,
humidity REAL NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_device_timestamp
ON sensor_data (device_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_timestamp
ON sensor_data (timestamp DESC);
File Management
Backup SQLite Database
SQLite databases are single files—easy to backup:
# Copy the file
cp /var/lib/expanso/cache.db /backup/cache_$(date +%Y%m%d).db
# Or use SQLite backup command
sqlite3 /var/lib/expanso/cache.db ".backup /backup/cache.db"
Check Database Size
ls -lh /var/lib/expanso/cache.db
Analyze Database
sqlite3 /var/lib/expanso/cache.db "PRAGMA integrity_check;"
sqlite3 /var/lib/expanso/cache.db "ANALYZE;"
Troubleshooting
Database is Locked
Error: database is locked
Causes:
- Multiple writers without WAL mode
- Long-running transactions
Solutions:
-
Enable WAL mode:
dsn: 'file:///path/to/db.db?_journal_mode=WAL' -
Set
conn_max_open: 1:conn_max_open: 1 -
Reduce transaction duration with smaller batches
File Permission Denied
Error: unable to open database file
Solutions:
- Ensure directory exists:
mkdir -p /var/lib/expanso - Check permissions:
chmod 755 /var/lib/expanso - Verify file permissions:
chmod 644 /var/lib/expanso/cache.db - Check user has write access to the directory
Disk Full
Error: database or disk is full
Solutions:
- Check disk space:
df -h - Clean up old data:
DELETE FROM events WHERE created_at < datetime('now', '-30 days');
VACUUM; - Rotate databases by date or size
Corrupted Database
Error: database disk image is malformed
Solutions:
- Restore from backup
- Try recovery:
sqlite3 corrupted.db ".recover" | sqlite3 recovered.db - Prevent corruption with WAL mode and proper shutdowns
Next Steps
- Database Components - Full component reference
- Common Patterns - More real-world examples
- Best Practices - Performance and optimization tips
- PostgreSQL Guide - For cloud database integration
- MySQL Guide - For cloud database integration