PostgreSQL with Expanso Edge
Connect Expanso Edge to PostgreSQL databases for reading, writing, and processing data at the edge. Use PostgreSQL for cloud database integration, analytics, and data replication.
Connection String (DSN)
PostgreSQL connections use the following DSN format:
dsn: postgres://user:password@host:port/database
Examples:
# Local PostgreSQL
dsn: postgres://postgres:password@localhost:5432/mydb
# Remote PostgreSQL with SSL disabled
dsn: postgres://user:[email protected]:5432/production?sslmode=disable
# With SSL required
dsn: postgres://user:[email protected]:5432/production?sslmode=require
# Connection timeout
dsn: postgres://user:pass@host:5432/db?connect_timeout=10
# Application name for monitoring
dsn: postgres://user:pass@host:5432/db?application_name=expanso-edge
SSL modes:
disable- No SSL (not recommended for production)require- SSL required, but don't verify certificateverify-ca- SSL required, verify CAverify-full- SSL required, verify CA and hostname
Reading from PostgreSQL
Query a Table
Use sql_select to read rows from a PostgreSQL table:
input:
sql_select:
driver: postgres
dsn: postgres://user:pass@localhost:5432/analytics?sslmode=disable
table: events
columns: ['event_id', 'user_id', 'event_type', 'created_at']
where: created_at > $1
args_mapping: 'root = [ now().ts_sub("1h") ]'
order_by: created_at ASC
pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()
output:
stdout: {}
Unlike MySQL which uses ?, PostgreSQL uses $1, $2, $3 for parameter placeholders.
Complex Queries with CTEs
Use sql_raw for advanced queries with Common Table Expressions:
input:
sql_raw:
driver: postgres
dsn: postgres://user:pass@localhost:5432/analytics?sslmode=disable
query: |
WITH recent_orders AS (
SELECT
user_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM orders
WHERE created_at > $1
GROUP BY user_id
)
SELECT
u.user_id,
u.email,
u.tier,
COALESCE(ro.order_count, 0) as order_count,
COALESCE(ro.total_spent, 0) as total_spent
FROM users u
LEFT JOIN recent_orders ro ON u.user_id = ro.user_id
WHERE u.tier = $2
args_mapping: 'root = [ now().ts_sub("30d"), "premium" ]'
Writing to PostgreSQL
Insert Rows
Use sql_insert to insert data into PostgreSQL:
input:
http_server:
path: /events
pipeline:
processors:
- mapping: |
root.user_id = this.user.id
root.event_type = this.event
root.payload = this.encode("json")
root.created_at = now()
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/analytics?sslmode=disable
table: events
columns: ['user_id', 'event_type', 'payload', 'created_at']
args_mapping: |
root = [
this.user_id,
this.event_type,
this.payload,
this.created_at
]
batching:
count: 100
period: 5s
Upsert (INSERT ... ON CONFLICT)
Handle conflicts with PostgreSQL's upsert syntax:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/analytics?sslmode=disable
table: user_stats
columns: ['user_id', 'login_count', 'last_login']
args_mapping: 'root = [ this.user_id, 1, now() ]'
suffix: |
ON CONFLICT (user_id) DO UPDATE SET
login_count = user_stats.login_count + 1,
last_login = EXCLUDED.last_login
Insert with RETURNING
Get inserted values back using RETURNING:
output:
sql_raw:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
query: |
INSERT INTO orders (customer_id, amount, status)
VALUES ($1, $2, $3)
RETURNING order_id, created_at
args_mapping: |
root = [
this.customer_id,
this.amount,
"pending"
]
PostgreSQL-Specific Features
JSONB Columns
Store and query JSON data efficiently:
# Insert JSONB data
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['event_type', 'metadata']
args_mapping: |
root = [
this.event_type,
this.metadata.encode("json") # Stored as JSONB
]
init_statement: |
CREATE TABLE IF NOT EXISTS events (
id SERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
metadata JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_metadata_gin ON events USING GIN (metadata);
Arrays
Use PostgreSQL arrays:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: user_tags
columns: ['user_id', 'tags']
args_mapping: |
root = [
this.user_id,
"{" + this.tags.join(",") + "}" # PostgreSQL array format
]
init_statement: |
CREATE TABLE IF NOT EXISTS user_tags (
user_id TEXT PRIMARY KEY,
tags TEXT[] NOT NULL
);
COPY for Bulk Loading
For extremely high-throughput inserts, use PostgreSQL COPY:
output:
sql_raw:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
query: |
COPY events (user_id, event_type, payload, created_at)
FROM STDIN WITH (FORMAT CSV)
args_mapping: |
root = [
this.user_id + "," +
this.event_type + "," +
this.payload.encode("json").escape_csv() + "," +
now().format_timestamp("2006-01-02 15:04:05", "UTC")
]
Common Use Cases
Data Enrichment with Multiple Lookups
Enrich streaming data with PostgreSQL lookups:
input:
kafka:
addresses: ['localhost:9092']
topics: ['user-events']
pipeline:
processors:
- mapping: 'root = this.parse_json()'
# Look up user details
- sql_select:
driver: postgres
dsn: postgres://user:pass@users-db:5432/users?sslmode=disable
table: users
columns: ['name', 'email', 'tier', 'preferences']
where: user_id = $1
args_mapping: 'root = [ this.user_id ]'
result_codec: json
- mapping: |
root.user = if this.length() > 0 { this.index(0) } else { null }
# Combine data
- mapping: |
root.event_id = this.event_id
root.user_name = this.user.name.or("unknown")
root.user_tier = this.user.tier.or("free")
root.timestamp = now()
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@analytics-db:5432/events?sslmode=disable
table: enriched_events
columns: ['event_id', 'user_name', 'user_tier', 'timestamp']
args_mapping: |
root = [
this.event_id,
this.user_name,
this.user_tier,
this.timestamp
]
Time-Series Data with TimescaleDB
If using TimescaleDB extension:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@timescale:5432/metrics?sslmode=disable
table: sensor_data
columns: ['time', 'sensor_id', 'temperature', 'humidity']
args_mapping: |
root = [
now(),
this.sensor_id,
this.temperature,
this.humidity
]
init_statement: |
CREATE TABLE IF NOT EXISTS sensor_data (
time TIMESTAMPTZ NOT NULL,
sensor_id TEXT NOT NULL,
temperature REAL,
humidity REAL
);
SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);
batching:
count: 1000
period: 10s
Database Initialization
Create Tables with PostgreSQL Features
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['event_id', 'event_type', 'metadata', 'created_at']
args_mapping: |
root = [
this.id,
this.type,
this.data.encode("json"),
now()
]
init_statement: |
CREATE TABLE IF NOT EXISTS events (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
event_type TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_event_type ON events (event_type);
CREATE INDEX IF NOT EXISTS idx_created_at ON events (created_at DESC);
CREATE INDEX IF NOT EXISTS idx_metadata_gin ON events USING GIN (metadata);
Performance Tips
Use Batching for Bulk Inserts
Batching dramatically improves PostgreSQL insert performance:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['data']
args_mapping: 'root = [ this.encode("json") ]'
batching:
count: 1000
period: 10s
Performance impact: 10-100x faster than individual inserts.
Connection Pooling
Configure connection pools:
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
conn_max_open: 20
conn_max_idle: 5
conn_max_idle_time: 5m
conn_max_life_time: 30m
Use Appropriate Indexes
For JSONB queries, use GIN indexes:
CREATE INDEX idx_metadata_gin ON events USING GIN (metadata);
For time-series data, use BRIN indexes:
CREATE INDEX idx_created_at_brin ON events USING BRIN (created_at);
AWS RDS PostgreSQL
Connect with SSL
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@my-rds-instance.region.rds.amazonaws.com:5432/mydb?sslmode=require
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
Use IAM Authentication
Connect to RDS using IAM database authentication:
output:
sql_insert:
driver: postgres
dsn: postgres://iamuser@my-rds-instance.region.rds.amazonaws.com:5432/mydb
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
iam_enabled: true
region: us-east-1
Azure Database for PostgreSQL
Connect with Azure AD Authentication
input:
sql_select:
driver: postgres
dsn: postgres://[email protected]:5432/db?sslmode=require
table: users
columns: ['id', 'email']
azure:
entra_enabled: true
token_request_options:
tenant_id: "your-tenant-id"
Troubleshooting
Connection Refused
Error: dial tcp: connection refused
Solutions:
- Verify PostgreSQL is running:
systemctl status postgresql - Check PostgreSQL is listening:
netstat -tlnp | grep 5432 - Ensure firewall allows connections
- For remote connections, edit
postgresql.conf:listen_addresses = '*' - And
pg_hba.conf:host all all 0.0.0.0/0 md5
SSL Connection Error
Error: SSL is not enabled on the server
Solution: Use sslmode=disable for local/development:
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
Password Authentication Failed
Error: password authentication failed
Solutions:
- Verify username and password
- Check
pg_hba.confallows password authentication:host all all 0.0.0.0/0 md5 - Reload PostgreSQL:
pg_ctl reload
Too Many Connections
Error: remaining connection slots are reserved
Solutions:
- Reduce
conn_max_openin your pipeline - Increase
max_connectionsinpostgresql.conf:max_connections = 200 - Restart PostgreSQL
Next Steps
- Database Components - Full component reference
- Common Patterns - More real-world examples
- Best Practices - Performance and optimization tips
- MySQL Guide - For MySQL-specific features