Skip to main content

PostgreSQL with Expanso Edge

Connect Expanso Edge to PostgreSQL databases for reading, writing, and processing data at the edge. Use PostgreSQL for cloud database integration, analytics, and data replication.

Connection String (DSN)

PostgreSQL connections use the following DSN format:

dsn: postgres://user:password@host:port/database

Examples:

# Local PostgreSQL
dsn: postgres://postgres:password@localhost:5432/mydb

# Remote PostgreSQL with SSL disabled
dsn: postgres://user:[email protected]:5432/production?sslmode=disable

# With SSL required
dsn: postgres://user:[email protected]:5432/production?sslmode=require

# Connection timeout
dsn: postgres://user:pass@host:5432/db?connect_timeout=10

# Application name for monitoring
dsn: postgres://user:pass@host:5432/db?application_name=expanso-edge

SSL modes:

  • disable - No SSL (not recommended for production)
  • require - SSL required, but don't verify certificate
  • verify-ca - SSL required, verify CA
  • verify-full - SSL required, verify CA and hostname

Reading from PostgreSQL

Query a Table

Use sql_select to read rows from a PostgreSQL table:

input:
sql_select:
driver: postgres
dsn: postgres://user:pass@localhost:5432/analytics?sslmode=disable
table: events
columns: ['event_id', 'user_id', 'event_type', 'created_at']
where: created_at > $1
args_mapping: 'root = [ now().ts_sub("1h") ]'
order_by: created_at ASC

pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()

output:
stdout: {}
PostgreSQL uses $1, $2 for placeholders

Unlike MySQL which uses ?, PostgreSQL uses $1, $2, $3 for parameter placeholders.

Complex Queries with CTEs

Use sql_raw for advanced queries with Common Table Expressions:

input:
sql_raw:
driver: postgres
dsn: postgres://user:pass@localhost:5432/analytics?sslmode=disable
query: |
WITH recent_orders AS (
SELECT
user_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM orders
WHERE created_at > $1
GROUP BY user_id
)
SELECT
u.user_id,
u.email,
u.tier,
COALESCE(ro.order_count, 0) as order_count,
COALESCE(ro.total_spent, 0) as total_spent
FROM users u
LEFT JOIN recent_orders ro ON u.user_id = ro.user_id
WHERE u.tier = $2
args_mapping: 'root = [ now().ts_sub("30d"), "premium" ]'

Writing to PostgreSQL

Insert Rows

Use sql_insert to insert data into PostgreSQL:

input:
http_server:
path: /events

pipeline:
processors:
- mapping: |
root.user_id = this.user.id
root.event_type = this.event
root.payload = this.encode("json")
root.created_at = now()

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/analytics?sslmode=disable
table: events
columns: ['user_id', 'event_type', 'payload', 'created_at']
args_mapping: |
root = [
this.user_id,
this.event_type,
this.payload,
this.created_at
]
batching:
count: 100
period: 5s

Upsert (INSERT ... ON CONFLICT)

Handle conflicts with PostgreSQL's upsert syntax:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/analytics?sslmode=disable
table: user_stats
columns: ['user_id', 'login_count', 'last_login']
args_mapping: 'root = [ this.user_id, 1, now() ]'
suffix: |
ON CONFLICT (user_id) DO UPDATE SET
login_count = user_stats.login_count + 1,
last_login = EXCLUDED.last_login

Insert with RETURNING

Get inserted values back using RETURNING:

output:
sql_raw:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
query: |
INSERT INTO orders (customer_id, amount, status)
VALUES ($1, $2, $3)
RETURNING order_id, created_at
args_mapping: |
root = [
this.customer_id,
this.amount,
"pending"
]

PostgreSQL-Specific Features

JSONB Columns

Store and query JSON data efficiently:

# Insert JSONB data
output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['event_type', 'metadata']
args_mapping: |
root = [
this.event_type,
this.metadata.encode("json") # Stored as JSONB
]
init_statement: |
CREATE TABLE IF NOT EXISTS events (
id SERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
metadata JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_metadata_gin ON events USING GIN (metadata);

Arrays

Use PostgreSQL arrays:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: user_tags
columns: ['user_id', 'tags']
args_mapping: |
root = [
this.user_id,
"{" + this.tags.join(",") + "}" # PostgreSQL array format
]
init_statement: |
CREATE TABLE IF NOT EXISTS user_tags (
user_id TEXT PRIMARY KEY,
tags TEXT[] NOT NULL
);

COPY for Bulk Loading

For extremely high-throughput inserts, use PostgreSQL COPY:

output:
sql_raw:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
query: |
COPY events (user_id, event_type, payload, created_at)
FROM STDIN WITH (FORMAT CSV)
args_mapping: |
root = [
this.user_id + "," +
this.event_type + "," +
this.payload.encode("json").escape_csv() + "," +
now().format_timestamp("2006-01-02 15:04:05", "UTC")
]

Common Use Cases

Data Enrichment with Multiple Lookups

Enrich streaming data with PostgreSQL lookups:

input:
kafka:
addresses: ['localhost:9092']
topics: ['user-events']

pipeline:
processors:
- mapping: 'root = this.parse_json()'

# Look up user details
- sql_select:
driver: postgres
dsn: postgres://user:pass@users-db:5432/users?sslmode=disable
table: users
columns: ['name', 'email', 'tier', 'preferences']
where: user_id = $1
args_mapping: 'root = [ this.user_id ]'
result_codec: json
- mapping: |
root.user = if this.length() > 0 { this.index(0) } else { null }

# Combine data
- mapping: |
root.event_id = this.event_id
root.user_name = this.user.name.or("unknown")
root.user_tier = this.user.tier.or("free")
root.timestamp = now()

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@analytics-db:5432/events?sslmode=disable
table: enriched_events
columns: ['event_id', 'user_name', 'user_tier', 'timestamp']
args_mapping: |
root = [
this.event_id,
this.user_name,
this.user_tier,
this.timestamp
]

Time-Series Data with TimescaleDB

If using TimescaleDB extension:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@timescale:5432/metrics?sslmode=disable
table: sensor_data
columns: ['time', 'sensor_id', 'temperature', 'humidity']
args_mapping: |
root = [
now(),
this.sensor_id,
this.temperature,
this.humidity
]
init_statement: |
CREATE TABLE IF NOT EXISTS sensor_data (
time TIMESTAMPTZ NOT NULL,
sensor_id TEXT NOT NULL,
temperature REAL,
humidity REAL
);
SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);
batching:
count: 1000
period: 10s

Database Initialization

Create Tables with PostgreSQL Features

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['event_id', 'event_type', 'metadata', 'created_at']
args_mapping: |
root = [
this.id,
this.type,
this.data.encode("json"),
now()
]
init_statement: |
CREATE TABLE IF NOT EXISTS events (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
event_type TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_event_type ON events (event_type);
CREATE INDEX IF NOT EXISTS idx_created_at ON events (created_at DESC);
CREATE INDEX IF NOT EXISTS idx_metadata_gin ON events USING GIN (metadata);

Performance Tips

Use Batching for Bulk Inserts

Batching dramatically improves PostgreSQL insert performance:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['data']
args_mapping: 'root = [ this.encode("json") ]'
batching:
count: 1000
period: 10s

Performance impact: 10-100x faster than individual inserts.

Connection Pooling

Configure connection pools:

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@localhost:5432/db?sslmode=disable
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
conn_max_open: 20
conn_max_idle: 5
conn_max_idle_time: 5m
conn_max_life_time: 30m

Use Appropriate Indexes

For JSONB queries, use GIN indexes:

CREATE INDEX idx_metadata_gin ON events USING GIN (metadata);

For time-series data, use BRIN indexes:

CREATE INDEX idx_created_at_brin ON events USING BRIN (created_at);

AWS RDS PostgreSQL

Connect with SSL

output:
sql_insert:
driver: postgres
dsn: postgres://user:pass@my-rds-instance.region.rds.amazonaws.com:5432/mydb?sslmode=require
table: events
columns: ['data']
args_mapping: 'root = [ this ]'

Use IAM Authentication

Connect to RDS using IAM database authentication:

output:
sql_insert:
driver: postgres
dsn: postgres://iamuser@my-rds-instance.region.rds.amazonaws.com:5432/mydb
table: events
columns: ['data']
args_mapping: 'root = [ this ]'
iam_enabled: true
region: us-east-1

Azure Database for PostgreSQL

Connect with Azure AD Authentication

input:
sql_select:
driver: postgres
dsn: postgres://[email protected]:5432/db?sslmode=require
table: users
columns: ['id', 'email']
azure:
entra_enabled: true
token_request_options:
tenant_id: "your-tenant-id"

Troubleshooting

Connection Refused

Error: dial tcp: connection refused

Solutions:

  • Verify PostgreSQL is running: systemctl status postgresql
  • Check PostgreSQL is listening: netstat -tlnp | grep 5432
  • Ensure firewall allows connections
  • For remote connections, edit postgresql.conf:
    listen_addresses = '*'
  • And pg_hba.conf:
    host all all 0.0.0.0/0 md5

SSL Connection Error

Error: SSL is not enabled on the server

Solution: Use sslmode=disable for local/development:

dsn: postgres://user:pass@localhost:5432/db?sslmode=disable

Password Authentication Failed

Error: password authentication failed

Solutions:

  • Verify username and password
  • Check pg_hba.conf allows password authentication:
    host all all 0.0.0.0/0 md5
  • Reload PostgreSQL: pg_ctl reload

Too Many Connections

Error: remaining connection slots are reserved

Solutions:

  • Reduce conn_max_open in your pipeline
  • Increase max_connections in postgresql.conf:
    max_connections = 200
  • Restart PostgreSQL

Next Steps