Skip to main content

MySQL with Expanso Edge

Connect Expanso Edge to MySQL (and MariaDB) databases for reading, writing, and processing data at the edge. Use MySQL for cloud database integration, data replication, and analytics.

Connection String (DSN)

MySQL connections use the following DSN format:

dsn: user:password@tcp(host:port)/database

Examples:

# Local MySQL
dsn: root:password@tcp(localhost:3306)/mydb

# Remote MySQL with explicit host
dsn: myuser:mypass@tcp(mysql.example.com:3306)/production

# MySQL over Unix socket
dsn: user:pass@unix(/var/run/mysqld/mysqld.sock)/dbname

# With parameters
dsn: user:pass@tcp(host:3306)/db?charset=utf8mb4&parseTime=true

# Skip TLS verification (not recommended for production)
dsn: user:pass@tcp(host:3306)/db?tls=skip-verify

Reading from MySQL

Query a Table

Use sql_select to read rows from a MySQL table:

input:
sql_select:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
table: events
columns: ['event_id', 'user_id', 'event_type', 'created_at']
where: created_at > ?
args_mapping: 'root = [ now().ts_sub("1h") ]'
order_by: created_at ASC

pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()

output:
stdout: {}

Complex Queries with Joins

Use sql_raw for complex queries:

input:
sql_raw:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
query: |
SELECT
u.user_id,
u.email,
COUNT(o.order_id) as order_count,
SUM(o.amount) as total_spent
FROM users u
LEFT JOIN orders o ON u.user_id = o.user_id
WHERE u.created_at > ?
GROUP BY u.user_id, u.email
HAVING order_count > 0
args_mapping: 'root = [ now().ts_sub("30d") ]'

Writing to MySQL

Insert Rows

Use sql_insert to insert data into MySQL:

input:
http_server:
path: /events

pipeline:
processors:
- mapping: |
root.user_id = this.user.id
root.event_type = this.event
root.payload = this.encode("json")
root.created_at = now()

output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
table: events
columns: ['user_id', 'event_type', 'payload', 'created_at']
args_mapping: |
root = [
this.user_id,
this.event_type,
this.payload,
this.created_at
]
batching:
count: 100
period: 5s

Upsert (INSERT ... ON DUPLICATE KEY UPDATE)

Handle duplicate keys with MySQL's upsert syntax:

output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
table: user_stats
columns: ['user_id', 'login_count', 'last_login']
args_mapping: 'root = [ this.user_id, 1, now() ]'
suffix: |
ON DUPLICATE KEY UPDATE
login_count = login_count + 1,
last_login = VALUES(last_login)

Custom Queries

Use sql_raw for UPDATE, DELETE, or custom SQL:

output:
sql_raw:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
query: |
UPDATE products
SET stock = stock - ?
WHERE product_id = ? AND stock >= ?
args_mapping: |
root = [
this.quantity,
this.product_id,
this.quantity
]

Common Use Cases

Data Replication from MySQL to PostgreSQL

Replicate data from MySQL to a PostgreSQL analytics database:

input:
sql_select:
driver: mysql
dsn: user:pass@tcp(mysql-source:3306)/production
table: orders
columns: ['order_id', 'customer_id', 'amount', 'status', 'created_at']
where: synced = 0
order_by: order_id ASC

output:
broker:
pattern: fan_out
outputs:
# Write to PostgreSQL
- sql_insert:
driver: postgres
dsn: postgres://user:pass@postgres-dest:5432/analytics
table: orders
columns: ['order_id', 'customer_id', 'amount', 'status', 'created_at']
args_mapping: |
root = [
this.order_id,
this.customer_id,
this.amount,
this.status,
this.created_at
]
batching:
count: 500
period: 10s

# Mark as synced in MySQL
- sql_raw:
driver: mysql
dsn: user:pass@tcp(mysql-source:3306)/production
query: 'UPDATE orders SET synced = 1 WHERE order_id = ?'
args_mapping: 'root = [ this.order_id ]'

Aggregate Before Insert

Reduce MySQL writes by aggregating at the edge:

input:
kafka:
addresses: ['localhost:9092']
topics: ['metrics']

buffer:
system_window:
timestamp_mapping: 'root = this.timestamp.ts_parse("2006-01-02T15:04:05Z")'
size: 5m

pipeline:
processors:
- group_by_value:
value: '${! json("metric_name") }'
- mapping: |
root.metric_name = @group_key
root.min_value = this.map_each(v -> v.value).min()
root.max_value = this.map_each(v -> v.value).max()
root.avg_value = this.map_each(v -> v.value).average()
root.count = this.length()
root.window_start = @window_start_timestamp
root.window_end = @window_end_timestamp

output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(mysql-cloud:3306)/metrics
table: metric_aggregates
columns: ['metric_name', 'min_value', 'max_value', 'avg_value', 'count', 'window_start', 'window_end']
args_mapping: |
root = [
this.metric_name,
this.min_value,
this.max_value,
this.avg_value,
this.count,
this.window_start,
this.window_end
]
batching:
count: 50
period: 30s

Database Initialization

Create Tables on Startup

Use init_statement to ensure tables exist:

output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/analytics
table: events
columns: ['event_id', 'event_type', 'created_at']
args_mapping: 'root = [ this.id, this.type, now() ]'
init_statement: |
CREATE TABLE IF NOT EXISTS events (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
event_id VARCHAR(255) NOT NULL UNIQUE,
event_type VARCHAR(100) NOT NULL,
created_at DATETIME NOT NULL,
INDEX idx_event_type (event_type),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Performance Tips

Use Batching for Bulk Inserts

Batching significantly improves MySQL insert performance:

output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
table: events
columns: ['data']
args_mapping: 'root = [ this.encode("json") ]'
batching:
count: 1000 # Insert 1000 rows at once
period: 10s

Performance impact: 10-100x faster than individual inserts.

Connection Pooling

Configure connection pools for high-throughput scenarios:

output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(localhost:3306)/db
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
conn_max_open: 20
conn_max_idle: 5
conn_max_idle_time: 5m
conn_max_life_time: 30m

Use InnoDB Engine

Always use InnoDB for production tables (supports transactions and better concurrency):

CREATE TABLE events (
...
) ENGINE=InnoDB;

AWS RDS MySQL

Connect to RDS with SSL

output:
sql_insert:
driver: mysql
dsn: user:pass@tcp(my-rds-instance.region.rds.amazonaws.com:3306)/mydb?tls=true
table: events
columns: ['data']
args_mapping: 'root = [ this ]'

Use IAM Authentication

Connect to RDS using IAM database authentication:

output:
sql_insert:
driver: mysql
dsn: tcp(my-rds-instance.region.rds.amazonaws.com:3306)/mydb
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
secret_name: prod/mysql/credentials
region: us-east-1

Troubleshooting

Connection Refused

Error: dial tcp: connection refused

Solutions:

  • Verify MySQL is running: systemctl status mysql
  • Check MySQL is listening on the correct port: netstat -tlnp | grep 3306
  • Ensure firewall allows connections
  • For remote connections, verify MySQL bind-address: bind-address = 0.0.0.0 in my.cnf

Access Denied

Error: Access denied for user

Solutions:

  • Verify username and password
  • Grant permissions: GRANT ALL PRIVILEGES ON dbname.* TO 'user'@'%';
  • For remote connections, ensure user is created for correct host:
    CREATE USER 'user'@'%' IDENTIFIED BY 'password';
    GRANT ALL PRIVILEGES ON dbname.* TO 'user'@'%';
    FLUSH PRIVILEGES;

Too Many Connections

Error: Too many connections

Solutions:

  • Reduce conn_max_open in your pipeline
  • Increase MySQL max_connections: SET GLOBAL max_connections = 500;
  • Check for connection leaks in other applications

Character Encoding Issues

Error: Garbled characters or emoji not displaying

Solution: Use utf8mb4 character set:

dsn: user:pass@tcp(host:3306)/db?charset=utf8mb4

And ensure table uses utf8mb4:

CREATE TABLE events (...) DEFAULT CHARSET=utf8mb4;

Next Steps