protobuf
Performs conversions to or from a protobuf message. This processor uses reflection, meaning conversions can be made directly from the target .proto files.
# Config fields, showing default valueslabel: ""protobuf: operator: "" # No default (required) message: "" # No default (required) discard_unknown: false use_proto_names: false import_paths: [] use_enum_numbers: false bsr: []The main functionality of this processor is to map to and from JSON documents, you can read more about JSON mapping of protobuf messages here: [https://github.com/redpanda-data/redpanda-connect-plugin-example](https://developers.google.com/protocol-buffers/docs/proto3#json
Using reflection for processing protobuf messages in this way is less performant than generating and using native code. Therefore when performance is critical it is recommended that you use Redpanda Connect plugins instead for processing protobuf messages natively, you can find an example of Redpanda Connect plugins at )(https://github.com/redpanda-data/redpanda-connect-plugin-example)
The processor will ignore any files that begin with a dot (”.“g), a convention for hidden files, when loading protocol buffer definitions.
Operators
to_json
Converts protobuf messages into a generic JSON structure. This makes it easier to manipulate the contents of the document within Redpanda Connect.
from_json
Attempts to create a target protobuf message from a generic JSON structure.
Examples
If we have the following protobuf definition within a directory called testing/schema:
syntax = "proto3";package testing;
import "google/protobuf/timestamp.proto";
message Person { string first_name = 1; string last_name = 2; string full_name = 3; int32 age = 4; int32 id = 5; // Unique ID number for this person. string email = 6;
google.protobuf.Timestamp last_updated = 7;}And a stream of JSON documents of the form:
{ "firstName": "caleb", "lastName": "quaye", "email": "caleb@myspace.com"}We can convert the documents into protobuf messages with the following config:
pipeline: processors: - protobuf: operator: from_json message: testing.Person import_paths: [ testing/schema ]If we have the following protobuf definition within a directory called testing/schema:
syntax = "proto3";package testing;
import "google/protobuf/timestamp.proto";
message Person { string first_name = 1; string last_name = 2; string full_name = 3; int32 age = 4; int32 id = 5; // Unique ID number for this person. string email = 6;
google.protobuf.Timestamp last_updated = 7;}And a stream of protobuf messages of the type Person, we could convert them into JSON documents of the format:
{ "firstName": "caleb", "lastName": "quaye", "email": "caleb@myspace.com"}With the following config:
pipeline: processors: - protobuf: operator: to_json message: testing.Person import_paths: [ testing/schema ]If we have the following protobuf definition within a BSR module hosted at buf.build/exampleco/mymodule:
syntax = "proto3";package testing;
import "google/protobuf/timestamp.proto";
message Person { string first_name = 1; string last_name = 2; string full_name = 3; int32 age = 4; int32 id = 5; // Unique ID number for this person. string email = 6;
google.protobuf.Timestamp last_updated = 7;}And a stream of JSON documents of the form:
{ "firstName": "caleb", "lastName": "quaye", "email": "caleb@myspace.com"}We can convert the documents into protobuf messages with the following config:
pipeline: processors: - protobuf: operator: from_json message: testing.Person bsr: - module: buf.build/exampleco/mymodule api_key: xxxIf we have the following protobuf definition within a BSR module hosted at buf.build/exampleco/mymodule:
syntax = "proto3";package testing;
import "google/protobuf/timestamp.proto";
message Person { string first_name = 1; string last_name = 2; string full_name = 3; int32 age = 4; int32 id = 5; // Unique ID number for this person. string email = 6;
google.protobuf.Timestamp last_updated = 7;}And a stream of protobuf messages of the type Person, we could convert them into JSON documents of the format:
{ "firstName": "caleb", "lastName": "quaye", "email": "caleb@myspace.com"}With the following config:
pipeline: processors: - protobuf: operator: to_json message: testing.Person bsr: - module: buf.build/exampleco/mymodule api_key: xxxxFields
operator
The operator to execute
Type: string
Options:
to_json
, from_json
.
message
The fully qualified name of the protobuf message to convert to/from.
Type: string
discard_unknown
If true, the from_json operator discards fields that are unknown to the schema.
Type: bool
Default: false
use_proto_names
If true, the to_json operator deserializes fields exactly as named in schema file.
Type: bool
Default: false
import_paths
A list of directories containing .proto files, including all definitions required for parsing the target message. If left empty the current directory is used. Each directory listed will be walked with all found .proto files imported. Either this field or bsr must be populated.
Type: array
Default: []
use_enum_numbers
If true, the to_json operator deserializes enums as numerical values instead of string names.
Type: bool
Default: false
bsr
Buf Schema Registry configuration. Either this field or import_paths must be populated. Note that this field is an array, and multiple BSR configurations can be provided.
Type: array
Default: []
bsr[].module
Module to fetch from a Buf Schema Registry e.g. ‘buf.build/exampleco/mymodule’.
Type: string
bsr[].url
Buf Schema Registry URL, leave blank to extract from module.
Type: string
Default: ""
bsr[].api_key
Buf Schema Registry server API key, can be left blank for a public registry.
Type: string
Default: ""
bsr[].version
Version to retrieve from the Buf Schema Registry, leave blank for latest.
Type: string
Default: ""