Skip to content

aws_s3

Sends message parts as objects to an Amazon S3 bucket. Each object is uploaded with the path specified with the path field.

Introduced in version 3.36.0.

# Common config fields, showing default values
output:
label: ""
aws_s3:
bucket: "" # No default (required)
path: ${!counter()}-${!timestamp_unix_nano()}.txt
tags: {}
content_type: application/octet-stream
metadata:
exclude_prefixes: []
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""

In order to have a different path for each object you should use function interpolations described in Bloblang queries, which are calculated per message of a batch.

Metadata

Metadata fields on messages will be sent as headers, in order to mutate these values (or remove them) check out the metadata docs.

Tags

The tags field allows you to specify key/value pairs to attach to objects as tags, where the values support interpolation functions:

output:
aws_s3:
bucket: TODO
path: ${!counter()}-${!timestamp_unix_nano()}.tar.gz
tags:
Key1: Value1
Timestamp: ${!meta("Timestamp")}

Credentials

By default Redpanda Connect will use a shared credentials file when connecting to AWS services. It’s also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in xref:guides:cloud/aws.adoc[].

Batching

It’s common to want to upload messages to S3 as batched archives, the easiest way to do this is to batch your messages at the output level and join the batch of messages with an archive and/or compress processor.

For example, if we wished to upload messages as a .tar.gz archive of documents we could achieve that with the following config:

output:
aws_s3:
bucket: TODO
path: ${!counter()}-${!timestamp_unix_nano()}.tar.gz
batching:
count: 100
period: 10s
processors:
- archive:
format: tar
- compress:
algorithm: gzip

Alternatively, if we wished to upload JSON documents as a single large document containing an array of objects we can do that with:

output:
aws_s3:
bucket: TODO
path: ${!counter()}-${!timestamp_unix_nano()}.json
batching:
count: 100
processors:
- archive:
format: json_array

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight.

Fields

bucket

The bucket to upload messages to.

Type: string

path

The path of each message to upload. This field supports interpolation functions.

Type: string

Default: "${!counter()}-${!timestamp_unix_nano()}.txt"

# Examples
path: ${!counter()}-${!timestamp_unix_nano()}.txt
path: ${!meta("kafka_key")}.json
path: ${!json("doc.namespace")}/${!json("doc.id")}.json

tags

Key/value pairs to store with the object as tags. This field supports interpolation functions.

Type: object

Default: {}

# Examples
tags:
Key1: Value1
Timestamp: ${!meta("Timestamp")}

content_type

The content type to set for each object. This field supports interpolation functions.

Type: string

Default: "application/octet-stream"

content_encoding

An optional content encoding to set for each object. This field supports interpolation functions.

Type: string

Default: ""

cache_control

The cache control to set for each object. This field supports interpolation functions.

Type: string

Default: ""

content_disposition

The content disposition to set for each object. This field supports interpolation functions.

Type: string

Default: ""

content_language

The content language to set for each object. This field supports interpolation functions.

Type: string

Default: ""

content_md5

The content MD5 to set for each object. This field supports interpolation functions.

Type: string

Default: ""

website_redirect_location

The website redirect location to set for each object. This field supports interpolation functions.

Type: string

Default: ""

metadata

Specify criteria for which metadata values are attached to objects as headers.

Type: object

metadata.exclude_prefixes

Provide a list of explicit metadata key prefixes to be excluded when adding metadata to sent messages.

Type: array

Default: []

storage_class

The storage class to set for each object. This field supports interpolation functions.

Type: string

Default: "STANDARD"

Options: STANDARD , REDUCED_REDUNDANCY , GLACIER , STANDARD_IA , ONEZONE_IA , INTELLIGENT_TIERING , DEEP_ARCHIVE .

kms_key_id

An optional server side encryption key.

Type: string

Default: ""

checksum_algorithm

The algorithm used to create the checksum for each object.

Type: string

Default: ""

Options: CRC32 , CRC32C , SHA1 , SHA256 .

server_side_encryption

An optional server side encryption algorithm.

Type: string

Default: "" Requires version 3.63.0 or newer

force_path_style_urls

Forces the client API to use path style URLs, which helps when connecting to custom endpoints.

Type: bool

Default: false

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int

Default: 64

timeout

The maximum period to wait on an upload before abandoning it and reattempting.

Type: string

Default: "5s"

batching

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array

region

The AWS region to target.

Type: string

Default: ""

endpoint

Allows you to specify a custom endpoint for the AWS API.

Type: string

Default: ""

credentials

Optional manual configuration of AWS credentials to use. More information can be found in xref:guides:cloud/aws.adoc[].

Type: object

credentials.profile

A profile from ~/.aws/credentials to use.

Type: string

Default: ""

credentials.id

The ID of credentials to use.

Type: string

Default: ""

credentials.secret

The secret for the credentials being used.

Type: string

Default: ""

credentials.token

The token for the credentials being used, required when using short term credentials.

Type: string

Default: ""

credentials.from_ec2_role

Use the credentials of a host EC2 machine configured to assume an IAM role associated with the instance.

Type: bool

Default: false Requires version 4.2.0 or newer

credentials.role

A role ARN to assume.

Type: string

Default: ""

credentials.role_external_id

An external ID to provide when assuming a role.

Type: string

Default: ""