Skip to main content

Database Connectivity

Connect Expanso Edge to SQL databases to read, transform, and write data at the edge. Whether you're using cloud databases like MySQL and PostgreSQL, or local embedded databases like SQLite, Expanso provides powerful components for database integration.

Why Database Integration at the Edge?

Processing data at the edge with database connectivity enables:

  • Local Analytics: Query edge databases (SQLite) without cloud dependencies
  • Data Enrichment: Join streaming data with reference tables stored locally
  • Reduced Latency: Process and filter data before sending to cloud databases
  • Offline Capability: Continue operations when cloud connectivity is limited
  • Cost Optimization: Aggregate data locally before writing to cloud databases

Quick Start

Reading from PostgreSQL

Query a PostgreSQL database and process each row:

input:
sql_select:
driver: postgres
dsn: postgres://user:pass@localhost:5432/mydb?sslmode=disable
table: sensor_readings
columns: ['id', 'temperature', 'humidity', 'timestamp']
where: timestamp > ?
args_mapping: 'root = [ now().ts_sub("1h") ]'

pipeline:
processors:
- mapping: |
root.sensor_id = this.id
root.temp_f = this.temperature * 9/5 + 32
root.reading_time = this.timestamp

output:
stdout: {}

Writing to MySQL

Insert transformed data into MySQL:

input:
http_server:
path: /webhook

pipeline:
processors:
- mapping: |
root.user_id = this.user.id
root.event_type = this.event
root.created_at = now()

output:
sql_insert:
driver: mysql
dsn: user:password@tcp(localhost:3306)/analytics
table: events
columns: ['user_id', 'event_type', 'created_at']
args_mapping: 'root = [ this.user_id, this.event_type, this.created_at ]'

Local SQLite Database

Use SQLite for edge storage and analytics:

input:
kafka:
addresses: ['localhost:9092']
topics: ['iot-events']

pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()

output:
sql_insert:
driver: sqlite
dsn: 'file:///var/lib/expanso/edge.db?_journal_mode=WAL'
table: event_log
columns: ['device_id', 'event_type', 'payload', 'processed_at']
args_mapping: |
root = [
this.device_id,
this.event_type,
this.payload.encode("json"),
this.processed_at
]
init_statement: |
CREATE TABLE IF NOT EXISTS event_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT,
processed_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

Supported Databases

Expanso Edge supports the following database drivers:

DatabaseDriverUse Case
MySQLmysqlCloud/on-prem relational database
PostgreSQLpostgresCloud/on-prem relational database
SQLitesqliteEmbedded edge database (single file)
Microsoft SQL ServermssqlEnterprise relational database
ClickHouseclickhouseAnalytical database for time-series
OracleoracleEnterprise relational database
SnowflakesnowflakeCloud data warehouse
TrinotrinoDistributed SQL query engine
Google SpannerspannerCloud-native distributed database
CosmosDBgocosmosAzure NoSQL database

Connection Strings (DSN)

Each database driver uses a specific Data Source Name (DSN) format:

# MySQL
dsn: user:password@tcp(host:3306)/database

# PostgreSQL
dsn: postgres://user:password@host:5432/database?sslmode=disable

# SQLite
dsn: file:///path/to/database.db

# Microsoft SQL Server
dsn: sqlserver://user:password@host:1433?database=mydb

# ClickHouse
dsn: clickhouse://user:password@host:9000/database

# Snowflake
dsn: user:password@account/database?warehouse=compute_wh

# Google Spanner
dsn: projects/my-project/instances/my-instance/databases/my-db

Next Steps

Explore specific topics for working with databases:

  • Component Reference - Detailed documentation for sql_select, sql_insert, and sql_raw components
  • Common Patterns - Real-world examples: caching, replication, enrichment, and aggregation
  • Best Practices - Performance tips, connection management, and troubleshooting

Common Questions

Can I use SQLite on edge devices?

Yes! SQLite is perfect for edge deployments. It's embedded (no separate database process), file-based, and works offline. Use it for local caching, analytics, and data buffering.

How do I handle database connection failures?

Use the retry output wrapper to automatically retry failed database operations:

output:
retry:
max_retries: 3
backoff:
initial_interval: 1s
max_interval: 10s
output:
sql_insert:
driver: postgres
# ... connection details ...
Can I query multiple databases in one pipeline?

Yes! Use multiple sql_select processors to enrich data from different databases. See the data enrichment pattern for examples.

What's the difference between sql_raw and sql_select?
  • sql_select: Simplified component for basic SELECT queries on a single table
  • sql_raw: Execute any SQL query (SELECT, INSERT, UPDATE, DELETE, joins, subqueries, etc.)

Use sql_select for simple queries, sql_raw for complex queries. See the component reference for details.