Skip to content

sql

Executes an arbitrary SQL query for each message.

Introduced in version 3.65.0.

# Common config fields, showing default values
output:
label: ""
sql:
driver: "" # No default (required)
data_source_name: "" # No default (required)
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?); # No default (required)
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional)
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""

Alternatives

For basic inserts use the sql_insert output. For more complex queries use the sql_raw output.

Fields

driver

A database driver to use.

Type: string

Options: mysql , postgres , clickhouse , mssql , sqlite , oracle , snowflake , trino , gocosmos .

data_source_name

Data source name.

Type: string

query

The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?) whereas others expect incrementing dollar signs ($1, $2, and so on) or colons (:1, :2 and so on). The style to use is outlined in this table:

DriverPlaceholder Style
clickhouseDollar sign
mysqlQuestion mark
postgresDollar sign
mssqlQuestion mark
sqliteQuestion mark
oracleColon
snowflakeQuestion mark
trinoQuestion mark
gocosmosColon

Type: string

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

args_mapping

An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query.

Type: string

# Examples
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]

max_in_flight

The maximum number of inserts to run in parallel.

Type: int

Default: 64

batching

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array