Database Connectivity
Connect Expanso Edge to SQL databases to read, transform, and write data at the edge. Whether you're using cloud databases like MySQL and PostgreSQL, or local embedded databases like SQLite, Expanso provides powerful components for database integration.
Why Database Integration at the Edge?
Processing data at the edge with database connectivity enables:
- Local Analytics: Query edge databases (SQLite) without cloud dependencies
- Data Enrichment: Join streaming data with reference tables stored locally
- Reduced Latency: Process and filter data before sending to cloud databases
- Offline Capability: Continue operations when cloud connectivity is limited
- Cost Optimization: Aggregate data locally before writing to cloud databases
Quick Start
Reading from PostgreSQL
Query a PostgreSQL database and process each row:
input:
sql_select:
driver: postgres
dsn: postgres://user:pass@localhost:5432/mydb?sslmode=disable
table: sensor_readings
columns: ['id', 'temperature', 'humidity', 'timestamp']
where: timestamp > ?
args_mapping: 'root = [ now().ts_sub("1h") ]'
pipeline:
processors:
- mapping: |
root.sensor_id = this.id
root.temp_f = this.temperature * 9/5 + 32
root.reading_time = this.timestamp
output:
stdout: {}
Writing to MySQL
Insert transformed data into MySQL:
input:
http_server:
path: /webhook
pipeline:
processors:
- mapping: |
root.user_id = this.user.id
root.event_type = this.event
root.created_at = now()
output:
sql_insert:
driver: mysql
dsn: user:password@tcp(localhost:3306)/analytics
table: events
columns: ['user_id', 'event_type', 'created_at']
args_mapping: 'root = [ this.user_id, this.event_type, this.created_at ]'
Local SQLite Database
Use SQLite for edge storage and analytics:
input:
kafka:
addresses: ['localhost:9092']
topics: ['iot-events']
pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()
output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/edge.db?_journal_mode=WAL'
table: event_log
columns: ['device_id', 'event_type', 'payload', 'processed_at']
args_mapping: |
root = [
this.device_id,
this.event_type,
this.payload.encode("json"),
this.processed_at
]
init_statement: |
CREATE TABLE IF NOT EXISTS event_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT,
processed_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
Supported Databases
Expanso Edge supports the following database drivers:
| Database | Driver | Use Case |
|---|---|---|
| MySQL | mysql | Cloud/on-prem relational database |
| PostgreSQL | postgres | Cloud/on-prem relational database |
| SQLite | sqlite | Embedded edge database (single file) |
| Microsoft SQL Server | mssql | Enterprise relational database |
| ClickHouse | clickhouse | Analytical database for time-series |
| Oracle | oracle | Enterprise relational database |
| Snowflake | snowflake | Cloud data warehouse |
| Trino | trino | Distributed SQL query engine |
| Google Spanner | spanner | Cloud-native distributed database |
| CosmosDB | gocosmos | Azure NoSQL database |
Connection Strings (DSN)
Each database driver uses a specific Data Source Name (DSN) format:
# MySQL
dsn: user:password@tcp(host:3306)/database
# PostgreSQL
dsn: postgres://user:password@host:5432/database?sslmode=disable
# SQLite
dsn: file:///path/to/database.db
# Microsoft SQL Server
dsn: sqlserver://user:password@host:1433?database=mydb
# ClickHouse
dsn: clickhouse://user:password@host:9000/database
# Snowflake
dsn: user:password@account/database?warehouse=compute_wh
# Google Spanner
dsn: projects/my-project/instances/my-instance/databases/my-db
Next Steps
Explore specific topics for working with databases:
- Component Reference - Detailed documentation for sql_select, sql_insert, and sql_raw components
- Common Patterns - Real-world examples: caching, replication, enrichment, and aggregation
- Best Practices - Performance tips, connection management, and troubleshooting
Common Questions
Can I use SQLite on edge devices?
Yes! SQLite is perfect for edge deployments. It's embedded (no separate database process), file-based, and works offline. Use it for local caching, analytics, and data buffering.
How do I handle database connection failures?
Use the retry output wrapper to automatically retry failed database operations:
output:
retry:
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 10s
output:
sql_insert:
driver: postgres
# ... connection details ...
Can I query multiple databases in one pipeline?
Yes! Use multiple sql_select processors to enrich data from different databases. See the data enrichment pattern for examples.
What's the difference between sql_raw and sql_select?
sql_select: Simplified component for basic SELECT queries on a single tablesql_raw: Execute any SQL query (SELECT, INSERT, UPDATE, DELETE, joins, subqueries, etc.)
Use sql_select for simple queries, sql_raw for complex queries. See the component reference for details.