elasticsearch_v8
Publishes messages into an Elasticsearch index. If the index does not exist then it is created with a dynamic mapping.
# Common config fields, showing default valuesoutput: label: "" elasticsearch_v8: urls: [] # No default (required) index: "" # No default (required) action: "" # No default (required) id: ${!counter()}-${!timestamp_unix()} # No default (required) max_in_flight: 64 batching: count: 0 byte_size: 0 period: "" check: ""
# Advanced config fields, showing default valuesoutput: label: "" elasticsearch_v8: urls: [] # No default (required) index: "" # No default (required) action: "" # No default (required) id: ${!counter()}-${!timestamp_unix()} # No default (required) pipeline: "" routing: "" retry_on_conflict: 0 tls: enabled: false skip_cert_verify: false enable_renegotiation: false root_cas: "" root_cas_file: "" client_certs: [] max_in_flight: 64 basic_auth: enabled: false username: "" password: "" batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional)
Both the id
and index
fields can be dynamically set using function interpolations described here. When sending batched messages these interpolations are performed per message part.
Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight
.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.
Examples
When updating documents, the request body should contain a combination of a doc
, upsert
, and/or script
fields at the top level, this should be done via mapping processors. doc
updates using a partial document, script
performs an update using a scripting language such as the built in Painless language, and upsert
updates an existing document or inserts a new one if it doesn’t exist. For more information on the structures and behaviors of these fields, please see the Elasticsearch Update API
yaml# Partial document updateoutput: processors: - mapping: | meta id = this.id # Performs a partial update ont he document. root.doc = this elasticsearch_v8: urls: [localhost:9200] index: foo id: ${! @id } action: update
# Scripted updateoutput: processors: - mapping: | meta id = this.id # Increments the field "counter" by 1. root.script.source = "ctx._source.counter += 1" elasticsearch_v8: urls: [localhost:9200] index: foo id: ${! @id } action: update
# Upsertoutput: processors: - mapping: | meta id = this.id # If the product with the ID exists, its price will be updated to 100. # If the product does not exist, a new document with ID 1 and a price # of 50 will be inserted. root.doc.product_price = 50 root.upsert.product_price = 100 elasticsearch_v8: urls: [localhost:9200] index: foo id: ${! @id } action: update
Here we read messages from a Redpanda cluster and write them to an Elasticsearch index using a field from the message as the ID for the Elasticsearch document.
yamlinput: redpanda: seed_brokers: [localhost:19092] topics: ["things"] consumer_group: "rpcn3" processors: - mapping: | meta id = this.id root = thisoutput: elasticsearch_v8: urls: ['http://localhost:9200'] index: "things" action: "index" id: ${! meta("id") }
Here we read messages from a AWS S3 bucket and write them to an Elasticsearch index using the S3 key as the ID for the Elasticsearch document.
yamlinput: aws_s3: bucket: "my-cool-bucket" prefix: "bug-facts/" scanner: to_the_end: {}output: elasticsearch_v8: urls: ['http://localhost:9200'] index: "cool-bug-facts" action: "index" id: ${! meta("s3_key") }
When using the create
action, a new document will be created if the document ID does not already exist. If the document ID already exists, the operation will fail.
yamloutput: elasticsearch_v8: urls: [localhost:9200] index: foo id: ${! json("id") } action: create
When using the upsert
action, if the document ID already exists, it will be updated. If the document ID does not exist, a new document will be inserted. The request body should contain the document to be indexed.
yamloutput: processors: - mapping: | meta id = this.id root = this.doc elasticsearch_v8: urls: [localhost:9200] index: foo id: ${! @id } action: upsert
Fields
urls
A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.
Type: array
# Examples
urls: - http://localhost:9200
index
The index to place messages. This field supports interpolation functions.
Type: string
action
The action to take on the document. This field must resolve to one of the following action types: index
, update
, delete
, create
or upsert
. See the Updating Documents
example for more on how the update
action works and the Create Documents
and Upserting Documents
examples for how to use the create
and upsert
actions respectively.
This field supports interpolation functions.
Type: string
id
The ID for indexed messages. Interpolation should be used in order to create a unique ID for each message. This field supports interpolation functions.
Type: string
# Examples
id: ${!counter()}-${!timestamp_unix()}
pipeline
An optional pipeline id to preprocess incoming documents. This field supports interpolation functions.
Type: string
Default: ""
routing
The routing key to use for the document. This field supports interpolation functions.
Type: string
Default: ""
retry_on_conflict
Specify how many times should an update operation be retried when a conflict occurs
Type: int
Default: 0
tls
Custom TLS settings can be used to override system defaults.
Type: object
tls.enabled
Whether custom TLS settings are enabled.
Type: bool
Default: false
tls.skip_cert_verify
Whether to skip server side certificate verification.
Type: bool
Default: false
tls.enable_renegotiation
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation
.
Type: bool
Default: false
Requires version 3.45.0 or newer
tls.root_cas
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examples
root_cas: |- -----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----
tls.root_cas_file
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examples
root_cas_file: ./root_cas.pem
tls.client_certs
A list of client certificates to use. For each certificate either the fields cert
and key
, or cert_file
and key_file
should be specified, but not both.
Type: array
Default: []
# Examples
client_certs: - cert: foo key: bar
client_certs: - cert_file: ./example.pem key_file: ./example.key
tls.client_certs[].cert
A plain text certificate to use.
Type: string
Default: ""
tls.client_certs[].key
A plain text certificate key to use.
Type: string
Default: ""
tls.client_certs[].cert_file
The path of a certificate to use.
Type: string
Default: ""
tls.client_certs[].key_file
The path of a certificate key to use.
Type: string
Default: ""
tls.client_certs[].password
A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC
algorithm is not supported for the PKCS#8 format.
Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.
Type: string
Default: ""
# Examples
password: foo
password: ${KEY_PASSWORD}
max_in_flight
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 64
basic_auth
Allows you to specify basic authentication.
Type: object
basic_auth.enabled
Whether to use basic authentication in requests.
Type: bool
Default: false
basic_auth.username
A username to authenticate as.
Type: string
Default: ""
basic_auth.password
A password to authenticate with.
Type: string
Default: ""
batching
Allows you to configure a batching policy.
Type: object
# Examples
batching: byte_size: 5000 count: 0 period: 1s
batching: count: 10 period: 1s
batching: check: this.contains("END BATCH") count: 0 period: 1m
batching.count
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examples
processors: - archive: format: concatenate
processors: - archive: format: lines
processors: - archive: format: json_array