sql_raw
Runs an arbitrary SQL query against a database and (optionally) returns the result as an array of objects, one for each row returned.
Introduced in version 3.65.0.
If the query fails to execute then the message will remain unchanged and the error can be caught using error handling methods.
Examples
The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages.
Here we query a database for columns of footable that share a user_id
with the message field user.id
. A branch
processor is used in order to insert the resulting array into the original message at the path foo_rows
.
Here we query a database for columns of footable that share a user_id
with the message field user.id
. A branch
processor is used in order to insert the resulting array into the original message at the path foo_rows
.
Fields
driver
A database driver to use.
Type: string
Options:
mysql
, postgres
, clickhouse
, mssql
, sqlite
, oracle
, snowflake
, trino
, gocosmos
, spanner
.
dsn
A Data Source Name to identify the target database.
Drivers
:driver-support: mysql=certified, postgres=certified, clickhouse=community, mssql=community, sqlite=certified, oracle=certified, snowflake=community, trino=community, gocosmos=community, spanner=community
The following is a list of supported drivers, their placeholder style, and their respective DSN formats:
|=== | Driver | Data Source Name Format
| clickhouse
| [clickhouse://[username[:password\](https://github.com/ClickHouse/clickhouse-go#dsn)@\][netloc\][:port\]/dbname[?param1=value1&...¶mN=valueN\]
^]
| mysql
| [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN]
| postgres
| postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
| mssql
| sqlserver://[user[:password]@][netloc][:port][?database=dbname¶m1=value1&...]
| sqlite
| file:/path/to/filename.db[?param&=value1&...]
| oracle
| oracle://[username[:password]@][netloc][:port]/service_name?server=server2&server=server3
| snowflake
| username[:password]@account_identifier/dbname/schemaname[?param1=value&...¶mN=valueN]
| trino
| [http[s\](https://github.com/trinodb/trino-go-client#dsn-data-source-name)://user[:pass\]@host[:port\][?parameters\]
^]
| gocosmos
| [AccountEndpoint=<cosmosdb-endpoint>;AccountKey=<cosmosdb-account-key>[;TimeoutMs=<timeout-in-ms>\](https://pkg.go.dev/github.com/microsoft/gocosmos#readme-example-usage)[;Version=<cosmosdb-api-version>\][;DefaultDb/Db=<db-name>\][;AutoId=<true/false>\][;InsecureSkipVerify=<true/false>\]
^]
| spanner
| projects/[PROJECT]/instances/[INSTANCE]/databases/[DATABASE]
|===
Please note that the postgres
driver enforces SSL by default, you can override this with the parameter sslmode=disable
if required.
The snowflake
driver supports multiple DSN formats. Please consult the docs for more details. For key pair authentication, the DSN has the following format: <snowflake_user>@<snowflake_account>/<db_name>/<schema_name>?warehouse=<warehouse>&role=<role>&authenticator=snowflake_jwt&privateKey=<base64_url_encoded_private_key>
, where the value for the privateKey
parameter can be constructed from an unencrypted RSA private key file rsa_key.p8
using openssl enc -d -base64 -in rsa_key.p8 | basenc --base64url -w0
(you can use gbasenc
insted of basenc
on OSX if you install coreutils
via Homebrew). If you have a password-encrypted private key, you can decrypt it using openssl pkcs8 -in rsa_key_encrypted.p8 -out rsa_key.p8
. Also, make sure fields such as the username are URL-encoded.
The gocosmos
driver is still experimental, but it has support for hierarchical partition keys as well as cross-partition queries. Please refer to the SQL notes for details.
Type: string
query
The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?
) whereas others expect incrementing dollar signs ($1
, $2
, and so on) or colons (:1
, :2
and so on). The style to use is outlined in this table:
Driver | Placeholder Style |
---|---|
clickhouse | Dollar sign |
mysql | Question mark |
postgres | Dollar sign |
mssql | Question mark |
sqlite | Question mark |
oracle | Colon |
snowflake | Question mark |
trino | Question mark |
gocosmos | Colon |
Type: string
unsafe_dynamic_query
Whether to enable interpolation functions in the query. Great care should be made to ensure your queries are defended against injection attacks.
Type: bool
Default: false
args_mapping
An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query
.
Type: string
exec_only
Whether the query result should be discarded. When set to true
the message contents will remain unchanged, which is useful in cases where you are executing inserts, updates, etc. By default this is true for the last query, and previous queries don’t change the results. If set to true for any query but the last one, the subsequent args_mappings
input is overwritten.
Type: bool
queries
A list of statements to run in addition to query
. When specifying multiple statements, they are all executed within a transaction. The output of the processor is always the last query that runs, unless exec_only
is used.
Type: array
queries[].query
The query to execute. The style of placeholder to use depends on the driver, some drivers require question marks (?
) whereas others expect incrementing dollar signs ($1
, $2
, and so on) or colons (:1
, :2
and so on). The style to use is outlined in this table:
Driver | Placeholder Style |
---|---|
clickhouse | Dollar sign |
mysql | Question mark |
postgres | Dollar sign |
mssql | Question mark |
sqlite | Question mark |
oracle | Colon |
snowflake | Question mark |
trino | Question mark |
gocosmos | Colon |
Type: string
queries[].args_mapping
An optional Bloblang mapping which should evaluate to an array of values matching in size to the number of placeholder arguments in the field query
.
Type: string
queries[].exec_only
Whether the query result should be discarded. When set to true
the message contents will remain unchanged, which is useful in cases where you are executing inserts, updates, etc. By default this is true for the last query, and previous queries don’t change the results. If set to true for any query but the last one, the subsequent args_mappings
input is overwritten.
Type: bool
init_files
An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).
Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both init_statement
and init_files
are specified the init_statement
is executed after the init_files
.
If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Type: array
Requires version 4.10.0 or newer
init_statement
An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts.
If both init_statement
and init_files
are specified the init_statement
is executed after the init_files
.
If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped.
Type: string
Requires version 4.10.0 or newer
conn_max_idle_time
An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If value <= 0
, connections are not closed due to a connections idle time.
Type: string
conn_max_life_time
An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If value <= 0
, connections are not closed due to a connections age.
Type: string
conn_max_idle
An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If value <= 0
, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release.
Type: int
Default: 2
conn_max_open
An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If value <= 0
, then there is no limit on the number of open connections. The default is 0 (unlimited).
Type: int