Skip to content

gcp_bigquery

Sends messages as new rows to a Google Cloud BigQuery table.

Introduced in version 3.55.0.

# Common config fields, showing default values
output:
label: ""
gcp_bigquery:
project: ""
job_project: ""
dataset: "" # No default (required)
table: "" # No default (required)
format: NEWLINE_DELIMITED_JSON
max_in_flight: 64
job_labels: {}
credentials_json: ""
csv:
header: []
field_delimiter: ','
batching:
count: 0
byte_size: 0
period: ""
check: ""

Credentials

By default Redpanda Connect will use a shared credentials file when connecting to GCP services. You can find out more in xref:guides:cloud/gcp.adoc[].

Format

This output currently supports only CSV, NEWLINE_DELIMITED_JSON and PARQUET, formats. Learn more about how to use GCP BigQuery with them here:

Each message may contain multiple elements separated by newlines. For example a single message containing:

{"key": "1"}
{"key": "2"}

Is equivalent to two separate messages:

{"key": "1"}

And:

{"key": "2"}

The same is true for the CSV format.

CSV

For the CSV format when the field csv.header is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.

Parquet

For parquet, the data can be encoded using the parquet_encode processor and each message that is sent to the output must be a full parquet message.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

Fields

project

The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.

Type: string

Default: ""

job_project

The project ID in which jobs will be exectuted. If not set, project will be used.

Type: string

Default: ""

dataset

The BigQuery Dataset ID.

Type: string

table

The table to insert messages to.

Type: string

format

The format of each incoming message.

Type: string

Default: "NEWLINE_DELIMITED_JSON"

Options: NEWLINE_DELIMITED_JSON , CSV , PARQUET .

max_in_flight

The maximum number of message batches to have in flight at a given time. Increase this to improve throughput.

Type: int

Default: 64

write_disposition

Specifies how existing data in a destination table is treated.

Type: string

Default: "WRITE_APPEND"

Options: WRITE_APPEND , WRITE_EMPTY , WRITE_TRUNCATE .

create_disposition

Specifies the circumstances under which destination table will be created. If CREATE_IF_NEEDED is used the GCP BigQuery will create the table if it does not already exist and tables are created atomically on successful completion of a job. The CREATE_NEVER option ensures the table must already exist and will not be automatically created.

Type: string

Default: "CREATE_IF_NEEDED"

Options: CREATE_IF_NEEDED , CREATE_NEVER .

ignore_unknown_values

Causes values not matching the schema to be tolerated. Unknown values are ignored. For CSV this ignores extra values at the end of a line. For JSON this ignores named values that do not match any column name. If this field is set to false (the default value), records containing unknown values are treated as bad records. The max_bad_records field can be used to customize how bad records are handled.

Type: bool

Default: false

max_bad_records

The maximum number of bad records that will be ignored when reading data.

Type: int

Default: 0

auto_detect

Indicates if we should automatically infer the options and schema for CSV and JSON sources. If the table doesn’t exist and this field is set to false the output may not be able to insert data and will throw insertion error. Be careful using this field since it delegates to the GCP BigQuery service the schema detection and values like "no" may be treated as booleans for the CSV format.

Type: bool

Default: false

job_labels

A list of labels to add to the load job.

Type: object

Default: {}

credentials_json

An optional field to set Google Service Account Credentials json.

Type: string

Default: ""

csv

Specify how CSV data should be interpretted.

Type: object

csv.header

A list of values to use as header for each batch of messages. If not specified the first line of each message will be used as header.

Type: array

Default: []

csv.field_delimiter

The separator for fields in a CSV file, used when reading or exporting data.

Type: string

Default: ","

csv.allow_jagged_rows

Causes missing trailing optional columns to be tolerated when reading CSV data. Missing values are treated as nulls.

Type: bool

Default: false

csv.allow_quoted_newlines

Sets whether quoted data sections containing newlines are allowed when reading CSV data.

Type: bool

Default: false

csv.encoding

Encoding is the character encoding of data to be read.

Type: string

Default: "UTF-8"

Options: UTF-8 , ISO-8859-1 .

csv.skip_leading_rows

The number of rows at the top of a CSV file that BigQuery will skip when reading data. The default value is 1 since Redpanda Connect will add the specified header in the first line of each batch sent to BigQuery.

Type: int

Default: 1

batching

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array