Skip to content

Building Pipelines

As mentioned before, a Wombat pipeline is defined by a configuration file that describes how messages are processed. Writing configurations isn’t difficult at all. We even provide a generator for you to get started quickly:

Terminal window
wombat create -s stdin/mapping/stdout > my_pipeline.yaml

Running this command will create a new pipeline configuration file called my_pipeline.yaml that reads from stdin, applies a simple mapping processor, and writes to stdout.

Terminal window
input:
stdin:
scanner:
lines: {}
auto_replay_nacks: true
buffer:
none: {}
pipeline:
threads: -1
processors:
- mapping: "" # No default (required)
output:
stdout:
codec: lines

How brilliant is that!? You can now run your pipeline with:

Terminal window
wombat -c my_pipeline.yaml

Everything you type into the terminal will be processed by Wombat and written back to the terminal. Obviously our job is done now and we can relish in the glory of our creation.

Structure

But just in case you are still hungry for more, here is a more detailed explanation of the configuration file. The configuration structure is made up of a number of root sections:

  • Core Components
  • Observability Components
  • Resource Components

Each of these sections can contain a number of components that are used to define the behaviour of the pipeline.

Core Components

Every Wombat pipeline configuration at least has one input and one output. A pipeline section containing allowing parallel execution is optional. Some more advanced pipeline configurations might want to rely on a buffer section as well to allow messages to be buffered before being flushed to the output or pipeline.

These components are considered to be the main components within Wombat as they provide the majority of useful behaviour.

input:
kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
buffer:
type: none
pipeline:
processors:
- mapping: |
message = this
meta.link_count = links.length()
output:
aws_s3:
bucket: TODO
path: '${! metadata("kafka_topic") }/${! json("message.id") }.json'

Resource Components

Resource components like caches and rate limits are referenced and shared among the core components.

input:
http_client: # This is an input
url: TODO
rate_limit: foo_ratelimit # This is a reference to a rate limit
pipeline:
processors:
- cache: # This is a processor
resource: baz_cache # This is a reference to a cache
operator: add
key: '${! json("id") }'
value: "x"
- mapping: root = if errored() { deleted() }
rate_limit_resources:
- label: foo_ratelimit
local:
count: 500
interval: 1s
cache_resources:
- label: baz_cache
memcached:
addresses: [ localhost:11211 ]

It’s also possible to configure inputs, outputs and processors as resources which allows them to be reused throughout a configuration with the resource input, resource output and resource processor respectively.

Observability Components

Finally, there are the observability components http, logger, metrics, and tracing, which allows you to specify how Wombat exposes observability data:

http:
address: 0.0.0.0:4195
enabled: true
debug_endpoints: false
logger:
format: json
level: WARN
metrics:
statsd:
address: localhost:8125
flush_period: 100ms
tracer:
jaeger:
agent_address: localhost:6831

More details on observability and monitoring can be found in the observability section.