sql_insert
Inserts rows into an SQL database for each message, and leaves the message unchanged.
Introduced in version 1.0.0.
- Common
- Advanced
# Common config fields, showing default values
label: ""
sql_insert:
driver: "" # No default (required)
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
table: foo # No default (required)
columns: [] # No default (required)
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (required)
# All config fields, showing default values
label: ""
sql_insert:
driver: "" # No default (required)
dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required)
table: foo # No default (required)
columns: [] # No default (required)
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (required)
prefix: "" # No default (optional)
suffix: ON CONFLICT (name) DO NOTHING # No default (optional)
init_files: [] # No default (optional)
init_statement: | # No default (optional)
CREATE TABLE IF NOT EXISTS some_table (
foo varchar(50) not null,
bar integer,
baz varchar(50),
primary key (foo)
) WITHOUT ROWID;
init_verify_conn: false
conn_max_idle_time: "" # No default (optional)
conn_max_life_time: "" # No default (optional)
conn_max_idle: 2
conn_max_open: 0 # No default (optional)
secret_name: "" # No default (optional)
iam_enabled: false
azure:
entra_enabled: false
token_request_options:
claims: ""
enable_cae: false
scopes:
- https://ossrdbms-aad.database.windows.net/.default
tenant_id: ""
region: ""
endpoint: ""
credentials:
profile: ""
id: ""
secret: ""
token: ""
from_ec2_role: false
role: ""
role_external_id: ""
If the insert fails to execute then the message will still remain unchanged and the error can be caught using error handling methods outlined here.
Examples
- Table Insert (MySQL)
Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:
pipeline:
processors:
- sql_insert:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
table: footable
columns: [ id, name, topic ]
args_mapping: |
root = [
this.user.id,
this.user.name,
metadata("kafka_topic"),
]
Fields
driver
A database driver to use.
Type: string
Options: mysql, postgres, clickhouse, mssql, sqlite, oracle, snowflake, trino, gocosmos, spanner.
dsn
A Data Source Name to identify the target database.