timeplus
Sends message to a Timeplus Enterprise stream via ingest endpoint
# Common config fields, showing default valuesoutput: label: "" timeplus: target: timeplus url: https://us-west-2.timeplus.cloud workspace: "" # No default (optional) stream: "" # No default (required) apikey: "" # No default (optional) username: "" # No default (optional) password: "" # No default (optional) max_in_flight: 64 batching: count: 0 byte_size: 0 period: "" check: ""
# Advanced config fields, showing default valuesoutput: label: "" timeplus: target: timeplus url: https://us-west-2.timeplus.cloud workspace: "" # No default (optional) stream: "" # No default (required) apikey: "" # No default (optional) username: "" # No default (optional) password: "" # No default (optional) max_in_flight: 64 batching: count: 0 byte_size: 0 period: "" check: "" processors: [] # No default (optional)
This output can send message to Timeplus Enterprise Cloud, Timeplus Enterprise (self-hosted) or directly to timeplusd.
This output accepts structured message only. It also expects all message contains the same keys and matches the schema of the destination stream. If the upstream source or pipeline returns unstructured message such as string, please refer to the “Unstructured message” example.
Examples
You will need to create API Key on Timeplus Enterprise Cloud Web console first and then set the apikey
field.
yamloutput: timeplus: workspace: my_workspace_id stream: mystream apikey: <Your API Key>
For self-housted Timeplus Enterprise, you will need to specify the username and password as well as the URL of the App server
yamloutput: timeplus: url: http://localhost:8000 workspace: my_workspace_id stream: mystream username: username password: pw
This output writes to Timeplusd via HTTP so make sure you specify the HTTP port of the Timeplusd.
yamloutput: timeplus: url: http://localhost:3218 stream: mystream username: username password: pw
If the upstream source or pipeline returns unstructured message such as string, you can leverage the output processors to wrap it into a stucture message and then pass it to the output. This example create a strcutre mesasge with raw
field and store the original string content into this field. You can modify the name of this raw
field to whatever you want. Please make sure the destiation stream contains such field
yamloutput: timeplus: workspace: my_workspace_id stream: mystream apikey: <Api key genereated on web console>
processors: - mapping: | root = {} root.raw = content().string()
Fields
target
The destination type, either Timeplus Enterprise or timeplusd
Type: string
Default: "timeplus"
Options:
timeplus
, timeplusd
.
url
The url should always include schema and host.
Type: string
Default: "https://us-west-2.timeplus.cloud"
# Examples
url: http://localhost:8000
url: http://127.0.0.1:3218
workspace
ID of the workspace. Required if target is timeplus
.
Type: string
stream
The name of the stream. Make sure the schema of the stream matches the input
Type: string
apikey
The API key. Required if you are sending message to Timeplus Enterprise Cloud
Type: string
username
The username. Required if you are sending message to Timeplus Enterprise (self-hosted) or timeplusd
Type: string
password
The password. Required if you are sending message to Timeplus Enterprise (self-hosted) or timeplusd
Type: string
max_in_flight
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 64
batching
Allows you to configure a batching policy.
Type: object
# Examples
batching: byte_size: 5000 count: 0 period: 1s
batching: count: 10 period: 1s
batching: check: this.contains("END BATCH") count: 0 period: 1m
batching.count
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples
period: 1s
period: 1m
period: 500ms
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples
check: this.type == "end_of_transaction"
batching.processors
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
# Examples
processors: - archive: format: concatenate
processors: - archive: format: lines
processors: - archive: format: json_array