Skip to content

Input

An input is a source of data piped through an array of optional processors.

input:
label: my_redis_input
redis_streams:
url: tcp://localhost:6379
streams:
- wombat_stream
body_key: body
consumer_group: wombat_group
# Optional list of processing steps
processors:
- mapping: |
root.document = this.without("links")
root.link_count = this.links.length()

Every Wombat pipeline has exactly one root input. However, this root input can be a broker which combines multiple inputs and merges the streams:

input:
broker:
inputs:
- kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
- redis_streams:
url: tcp://localhost:6379
streams:
- wombat_stream
body_key: body
consumer_group: wombat_group

Some inputs have a logical end, for example a csv input ends once the last row is consumed, when this happens the input gracefully terminates and Wombat will shut itself down once all messages have been processed fully.

It’s also possible to specify a logical end for an input that otherwise doesn’t have one with the read_until input, which checks a condition against each consumed message in order to determine whether it should be the last.

Scanners

For most Wombat inputs the data consumed comes pre-partitioned into discrete messages which can be comfortably held and processed in memory. However, some inputs such as the file input often need to consume data that is large enough that it cannot be processed entirely within memory, and others such as the socket input don’t have a concept of consuming the data “entirely”.

For such inputs it’s necessary to define a mechanism by which the stream of source bytes can be chopped into smaller logical messages, processed and outputted as a continuous process whilst the stream is being read, as this dramatically reduces the memory usage of Wombat as a whole and results in a more fluid flow of data.

The way in which we define this chopping mechanism is through scanners, configured as a field on each input that requires one. For example, if we wished to consume files line-by-line, which each individual line being processed as a discrete message, we could use the lines scanner with our file input:

input:
file:
paths: [ "./*.txt" ]
scanner:
lines: { }

A scanner is a plugin similar to any other core Wombat component (inputs, processors, outputs, etc), which means it’s possible to define your own scanners that can be utilised by inputs that need them.

Input Processors

Inputs can have an optional list of processors that are applied to each message as it is consumed. These processors are applied before the message is passed to the rest of the pipeline.

Since input processors are applied before the message is passed to the rest of the pipeline, they are a good place to perform filtering or transformation that is specific to the input source. That way the rest of the pipeline can be generic and not need to know about the specifics of the input source.

For example, imagine you want to filter messages based on a field in the message before passing it to the rest of the pipeline:

input:
kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
processors:
- mapping: |-
root = if this.active {
this
} else {
deleted()
}

Metrics

The following metrics are exposed for an input.

  • input_received: A count of the number of messages received by the input.
  • input_latency_ns: Measures the roundtrip latency in nanoseconds from the point at which a message is read up to the moment the message has either been acknowledged by an output, has been stored within a buffer, or has been rejected ( nacked).
  • batch_created: A count of each time an input-level batch has been created using a batching policy. Includes a label mechanism describing the particular mechanism that triggered it, one of; count, size, period, check.
  • input_connection_up: For continuous stream based inputs represents a count of the number of the times the input has successfully established a connection to the target source. For poll based inputs that do not retain an active connection this value will increment once.
  • input_connection_failed: For continuous stream based inputs represents a count of the number of times the input has failed to establish a connection to the target source.
  • input_connection_lost: For continuous stream based inputs represents a count of the number of times the input has lost a previously established connection to the target source.