azure_cosmosdb
Executes a SQL query against Azure CosmosDB and creates a batch of messages from each page of items.
Introduced in version v4.25.0.
# Common config fields, showing default valuesinput: label: "" azure_cosmosdb: endpoint: https://localhost:8081 # No default (optional) account_key: '!!!SECRET_SCRUBBED!!!' # No default (optional) connection_string: '!!!SECRET_SCRUBBED!!!' # No default (optional) database: testdb # No default (required) container: testcontainer # No default (required) partition_keys_map: root = "blobfish" # No default (required) query: SELECT c.foo FROM testcontainer AS c WHERE c.bar = "baz" AND c.timestamp < @timestamp # No default (required) args_mapping: |- # No default (optional) root = [ { "Name": "@name", "Value": "benthos" }, ] auto_replay_nacks: true
# Advanced config fields, showing default valuesinput: label: "" azure_cosmosdb: endpoint: https://localhost:8081 # No default (optional) account_key: '!!!SECRET_SCRUBBED!!!' # No default (optional) connection_string: '!!!SECRET_SCRUBBED!!!' # No default (optional) database: testdb # No default (required) container: testcontainer # No default (required) partition_keys_map: root = "blobfish" # No default (required) query: SELECT c.foo FROM testcontainer AS c WHERE c.bar = "baz" AND c.timestamp < @timestamp # No default (required) args_mapping: |- # No default (optional) root = [ { "Name": "@name", "Value": "benthos" }, ] batch_count: -1 auto_replay_nacks: true
Cross-partition queries
Cross-partition queries are currently not supported by the underlying driver. For every query, the PartitionKey values must be known in advance and specified in the config. See details.
Credentials
You can use one of the following authentication mechanisms:
- Set the
endpoint
field and theaccount_key
field - Set only the
endpoint
field to use DefaultAzureCredential - Set the
connection_string
field
Metadata
This component adds the following metadata fields to each message:
- activity_id- request_charge
You can access these metadata fields using function interpolation.
Examples
Execute a parametrized SQL query to select documents from a container.
yamlinput: azure_cosmosdb: endpoint: http://localhost:8080 account_key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw== database: blobbase container: blobfish partition_keys_map: root = "AbyssalPlain" query: SELECT * FROM blobfish AS b WHERE b.species = @species args_mapping: | root = [ { "Name": "@species", "Value": "smooth-head" }, ]
Fields
endpoint
CosmosDB endpoint.
Type: string
# Examples
endpoint: https://localhost:8081
account_key
Account key.
Type: string
# Examples
account_key: C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==
connection_string
Connection string.
Type: string
# Examples
connection_string: AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;
database
Database.
Type: string
# Examples
database: testdb
container
Container.
Type: string
# Examples
container: testcontainer
partition_keys_map
A Bloblang mapping which should evaluate to a single partition key value or an array of partition key values of type string, integer or boolean. Currently, hierarchical partition keys are not supported so only one value may be provided.
Type: string
# Examples
partition_keys_map: root = "blobfish"
partition_keys_map: root = 41
partition_keys_map: root = true
partition_keys_map: root = null
partition_keys_map: root = now().ts_format("2006-01-02")
query
The query to execute
Type: string
# Examples
query: SELECT c.foo FROM testcontainer AS c WHERE c.bar = "baz" AND c.timestamp < @timestamp
args_mapping
A Bloblang mapping that, for each message, creates a list of arguments to use with the query.
Type: string
# Examples
args_mapping: |- root = [ { "Name": "@name", "Value": "benthos" }, ]
batch_count
The maximum number of messages that should be accumulated into each batch. Use ‘-1’ specify dynamic page size.
Type: int
Default: -1
auto_replay_nacks
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to false
these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Type: bool
Default: true
CosmosDB emulator
If you wish to run the CosmosDB emulator that is referenced in the documentation here, the following Docker command should do the trick:
> docker run --rm -it -p 8081:8081 --name=cosmosdb -e AZURE_COSMOS_EMULATOR_PARTITION_COUNT=10 -e AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE=false mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator
Note: AZURE_COSMOS_EMULATOR_PARTITION_COUNT
controls the number of partitions that will be supported by the emulator. The bigger the value, the longer it takes for the container to start up.
Additionally, instead of installing the container self-signed certificate which is exposed via [mitmproxy](https://localhost:8081/_explorer/emulator.pem
, you can run https://mitmproxy.org/) like so:
> mitmproxy -k --mode "reverse:https://localhost:8081"
Then you can access the CosmosDB UI via http://localhost:8080/_explorer/index.html
and use http://localhost:8080
as the CosmosDB endpoint.