MySQL with Expanso Edge
Connect Expanso Edge to MySQL (and MariaDB) databases for reading, writing, and processing data at the edge. Use MySQL for cloud database integration, data replication, and analytics.
Connection String (DSN)
MySQL connections use the following DSN format:
dsn: user:password@tcp(host:port)/database
Examples:
# Local MySQL
dsn: root:password@tcp(localhost:3306)/mydb
# Remote MySQL with explicit host
dsn: myuser:mypass@tcp(mysql.example.com:3306)/production
# MySQL over Unix socket
dsn: user:pass@unix(/var/run/mysqld/mysqld.sock)/dbname
# With parameters
dsn: user:pass@tcp(host:3306)/db?charset=utf8mb4&parseTime=true
# Skip TLS verification (not recommended for production)
dsn: user:pass@tcp(host:3306)/db?tls=skip-verify
Reading from MySQL
Query a Table
Use sql_select to read rows from a MySQL table:
input:
sql_select:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
table: events
columns: ['event_id', 'user_id', 'event_type', 'created_at']
where: created_at > ?
args_mapping: 'root = [ now().ts_sub("1h") ]'
order_by: created_at ASC
pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()
output:
stdout: {}
Complex Queries with Joins
Use sql_raw for complex queries:
input:
sql_raw:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
query: |
SELECT
u.user_id,
u.email,
COUNT(o.order_id) as order_count,
SUM(o.amount) as total_spent
FROM users u
LEFT JOIN orders o ON u.user_id = o.user_id
WHERE u.created_at > ?
GROUP BY u.user_id, u.email
HAVING order_count > 0
args_mapping: 'root = [ now().ts_sub("30d") ]'
Writing to MySQL
Insert Rows
Use sql_insert to insert data into MySQL:
input:
http_server:
path: /events
pipeline:
processors:
- mapping: |
root.user_id = this.user.id
root.event_type = this.event
root.payload = this.encode("json")
root.created_at = now()
output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
table: events
columns: ['user_id', 'event_type', 'payload', 'created_at']
args_mapping: |
root = [
this.user_id,
this.event_type,
this.payload,
this.created_at
]
batching:
count: 100
period: 5s
Upsert (INSERT ... ON DUPLICATE KEY UPDATE)
Handle duplicate keys with MySQL's upsert syntax:
output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
table: user_stats
columns: ['user_id', 'login_count', 'last_login']
args_mapping: 'root = [ this.user_id, 1, now() ]'
suffix: |
ON DUPLICATE KEY UPDATE
login_count = login_count + 1,
last_login = VALUES(last_login)
Custom Queries
Use sql_raw for UPDATE, DELETE, or custom SQL:
output:
sql_raw:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
query: |
UPDATE products
SET stock = stock - ?
WHERE product_id = ? AND stock >= ?
args_mapping: |
root = [
this.quantity,
this.product_id,
this.quantity
]
Common Use Cases
Data Replication from MySQL to PostgreSQL
Replicate data from MySQL to a PostgreSQL analytics database:
input:
sql_select:
driver: mysql
dsn: user:pass@tcp(mysql-source:3306)/production
table: orders
columns: ['order_id', 'customer_id', 'amount', 'status', 'created_at']
where: synced = 0
order_by: order_id ASC
output:
broker:
pattern: fan_out
outputs:
# Write to PostgreSQL
- sql_insert:
driver: postgres
dsn: postgres://user:pass@postgres-dest:5432/analytics
table: orders
columns: ['order_id', 'customer_id', 'amount', 'status', 'created_at']
args_mapping: |
root = [
this.order_id,
this.customer_id,
this.amount,
this.status,
this.created_at
]
batching:
count: 500
period: 10s
# Mark as synced in MySQL
- sql_raw:
driver: mysql
dsn: user:pass@tcp(mysql-source:3306)/production
query: 'UPDATE orders SET synced = 1 WHERE order_id = ?'
args_mapping: 'root = [ this.order_id ]'
Aggregate Before Insert
Reduce MySQL writes by aggregating at the edge:
input:
kafka:
addresses: ['localhost:9092']
topics: ['metrics']
buffer:
system_window:
timestamp_mapping: 'root = this.timestamp.ts_parse("2006-01-02T15:04:05Z")'
size: 5m
pipeline:
processors:
- group_by_value:
value: '${! json("metric_name") }'
- mapping: |
root.metric_name = @group_key
root.min_value = this.map_each(v -> v.value).min()
root.max_value = this.map_each(v -> v.value).max()
root.avg_value = this.map_each(v -> v.value).average()
root.count = this.length()
root.window_start = @window_start_timestamp
root.window_end = @window_end_timestamp
output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(mysql-cloud:3306)/metrics
table: metric_aggregates
columns: ['metric_name', 'min_value', 'max_value', 'avg_value', 'count', 'window_start', 'window_end']
args_mapping: |
root = [
this.metric_name,
this.min_value,
this.max_value,
this.avg_value,
this.count,
this.window_start,
this.window_end
]
batching:
count: 50
period: 30s
Database Initialization
Create Tables on Startup
Use init_statement to ensure tables exist:
output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
table: events
columns: ['event_id', 'event_type', 'created_at']
args_mapping: 'root = [ this.id, this.type, now() ]'
init_statement: |
CREATE TABLE IF NOT EXISTS events (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
event_id VARCHAR(255) NOT NULL UNIQUE,
event_type VARCHAR(100) NOT NULL,
created_at DATETIME NOT NULL,
INDEX idx_event_type (event_type),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Performance Tips
Use Batching for Bulk Inserts
Batching significantly improves MySQL insert performance:
output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
table: events
columns: ['data']
args_mapping: 'root = [ this.encode("json") ]'
batching:
count: 1000 # Insert 1000 rows at once
period: 10s
Performance impact: 10-100x faster than individual inserts.
Connection Pooling
Configure connection pools for high-throughput scenarios:
output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
conn_max_open: 20
conn_max_idle: 5
conn_max_idle_time: 5m
conn_max_life_time: 30m
Use InnoDB Engine
Always use InnoDB for production tables (supports transactions and better concurrency):
CREATE TABLE events (
...
) ENGINE=InnoDB;
AWS RDS MySQL
Connect to RDS with SSL
output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(my-rds-instance.region.rds.amazonaws.com:3306)/mydb?tls=true
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
Use IAM Authentication
Connect to RDS using IAM database authentication:
output:
sql_insert:
driver: mysql
dsn: tcp(my-rds-instance.region.rds.amazonaws.com:3306)/mydb
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
secret_name: prod/mysql/credentials
region: us-east-1
Troubleshooting
Connection Refused
Error: dial tcp: connection refused
Solutions:
- Verify MySQL is running:
systemctl status mysql - Check MySQL is listening on the correct port:
netstat -tlnp | grep 3306 - Ensure firewall allows connections
- For remote connections, verify MySQL bind-address:
bind-address = 0.0.0.0inmy.cnf
Access Denied
Error: Access denied for user
Solutions:
- Verify username and password
- Grant permissions:
GRANT ALL PRIVILEGES ON dbname.* TO 'user'@'%'; - For remote connections, ensure user is created for correct host:
CREATE USER 'user'@'%' IDENTIFIED BY 'password';
GRANT ALL PRIVILEGES ON dbname.* TO 'user'@'%';
FLUSH PRIVILEGES;
Too Many Connections
Error: Too many connections
Solutions:
- Reduce
conn_max_openin your pipeline - Increase MySQL max_connections:
SET GLOBAL max_connections = 500; - Check for connection leaks in other applications
Character Encoding Issues
Error: Garbled characters or emoji not displaying
Solution: Use utf8mb4 character set:
dsn: user:pass@tcp(host:3306)/db?charset=utf8mb4
And ensure table uses utf8mb4:
CREATE TABLE events (...) DEFAULT CHARSET=utf8mb4;
Next Steps
- Database Components - Full component reference
- Common Patterns - More real-world examples
- Best Practices - Performance and optimization tips
- PostgreSQL Guide - For PostgreSQL-specific features