Skip to content

azure_cosmosdb

Creates or updates messages as JSON documents in Azure CosmosDB.

Introduced in version v4.25.0.

# Common config fields, showing default values
output:
label: ""
azure_cosmosdb:
endpoint: https://localhost:8081 # No default (optional)
account_key: '!!!SECRET_SCRUBBED!!!' # No default (optional)
connection_string: '!!!SECRET_SCRUBBED!!!' # No default (optional)
database: testdb # No default (required)
container: testcontainer # No default (required)
partition_keys_map: root = "blobfish" # No default (required)
operation: Create
item_id: ${! json("id") } # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
check: ""
max_in_flight: 64

When creating documents, each message must have the id property (case-sensitive) set (or use auto_id: true). It is the unique name that identifies the document, that is, no two documents share the same id within a logical partition. The id field must not exceed 255 characters. See details.

The partition_keys field must resolve to the same value(s) across the entire message batch.

Credentials

You can use one of the following authentication mechanisms:

  • Set the endpoint field and the account_key field
  • Set only the endpoint field to use DefaultAzureCredential
  • Set the connection_string field

Batching

CosmosDB limits the maximum batch size to 100 messages and the payload must not exceed 2MB (details here).

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

Examples

Create new documents in the blobfish container with partition key /habitat.

output:
azure_cosmosdb:
endpoint: http://localhost:8080
account_key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==
database: blobbase
container: blobfish
partition_keys_map: root = json("habitat")
operation: Create

Fields

endpoint

CosmosDB endpoint.

Type: string

# Examples
endpoint: https://localhost:8081

account_key

Account key.

Type: string

# Examples
account_key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==

connection_string

Connection string.

Type: string

# Examples
connection_string: AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;

database

Database.

Type: string

# Examples
database: testdb

container

Container.

Type: string

# Examples
container: testcontainer

partition_keys_map

A Bloblang mapping which should evaluate to a single partition key value or an array of partition key values of type string, integer or boolean. Currently, hierarchical partition keys are not supported so only one value may be provided.

Type: string

# Examples
partition_keys_map: root = "blobfish"
partition_keys_map: root = 41
partition_keys_map: root = true
partition_keys_map: root = null
partition_keys_map: root = json("blobfish").depth

operation

Operation.

Type: string

Default: "Create"

OptionSummary
CreateCreate operation.
DeleteDelete operation.
PatchPatch operation.
ReplaceReplace operation.
UpsertUpsert operation.

patch_operations

Patch operations to be performed when operation: Patch .

Type: array

patch_operations[].operation

Operation.

Type: string

Default: "Add"

OptionSummary
AddAdd patch operation.
IncrementIncrement patch operation.
RemoveRemove patch operation.
ReplaceReplace patch operation.
SetSet patch operation.

patch_operations[].path

Path.

Type: string

# Examples
path: /foo/bar/baz

patch_operations[].value_map

A Bloblang mapping which should evaluate to a value of any type that is supported by CosmosDB.

Type: string

# Examples
value_map: root = "blobfish"
value_map: root = 41
value_map: root = true
value_map: root = json("blobfish").depth
value_map: root = [1, 2, 3]

patch_condition

Patch operation condition. This field supports interpolation functions.

Type: string

# Examples
patch_condition: from c where not is_defined(c.blobfish)

auto_id

Automatically set the item id field to a random UUID v4. If the id field is already set, then it will not be overwritten. Setting this to false can improve performance, since the messages will not have to be parsed.

Type: bool

Default: true

item_id

ID of item to replace or delete. Only used by the Replace and Delete operations This field supports interpolation functions.

Type: string

# Examples
item_id: ${! json("id") }

batching

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int

Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int

Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string

Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string

Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int

Default: 64

CosmosDB emulator

If you wish to run the CosmosDB emulator that is referenced in the documentation here, the following Docker command should do the trick:

Terminal window
> docker run --rm -it -p 8081:8081 --name=cosmosdb -e AZURE_COSMOS_EMULATOR_PARTITION_COUNT=10 -e AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE=false mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator

Note: AZURE_COSMOS_EMULATOR_PARTITION_COUNT controls the number of partitions that will be supported by the emulator. The bigger the value, the longer it takes for the container to start up.

Additionally, instead of installing the container self-signed certificate which is exposed via [mitmproxy](https://localhost:8081/_explorer/emulator.pem, you can run https://mitmproxy.org/) like so:

Terminal window
> mitmproxy -k --mode "reverse:https://localhost:8081"

Then you can access the CosmosDB UI via http://localhost:8080/_explorer/index.html and use http://localhost:8080 as the CosmosDB endpoint.