Joining Streams
This cookbook demonstrates how to merge JSON events from parallel streams using content based rules and a cache of your choice.
The imaginary problem we are going to solve is hydrating a feed of article comments with information from their parent articles. We will be consuming and writing to Kafka, but the example works with any input and output combination.
Articles are received over the topic articles
and look like this:
Comments can either be posted on an article or a parent comment, are received over the topic comments
, and look like this:
Our goal is to end up with a single stream of comments, where information about the root article of the comment is attached to the event. The above comment should exit our pipeline looking like this:
In order to achieve this we will need to cache articles as they pass through our pipelines and then retrieve them for each comment passing through. Since the parent of a comment might be another comment we will also need to cache and retrieve comments in the same way.
Caching Articles
Our first pipeline is very simple, we just consume articles, reduce them to only the fields we wish to cache, and then cache them. If we receive the same article multiple times we’re going to assume it’s okay to overwrite the old article in the cache.
In this example I’m targeting Redis, but you can choose any of the supported cache targets. The TTL of cached articles is set to one week.
Hydrating Comments
Our second pipeline consumes comments, caches them in case a subsequent comment references them, obtains its parent (article or comment), and attaches the root article to the event before sending it to our output topic comments_hydrated
.
In this config we make use of the branch
processor as it allows us to reduce documents into smaller maps for caching and gives us greater control over how results are mapped back into the document.
This pipeline satisfies our basic needs but errors aren’t handled at all, meaning intermittent cache connectivity problems that span beyond our cache retries will result in failed documents entering our comments_hydrated
topic. This is also the case if a comment arrives in our pipeline before its parent.
There are many patterns for error handling to choose from in Wombat. In this example we’re going to introduce a delayed retry queue as it enables us to reprocess failed documents after a grace period, which is isolated from our main pipeline.
Adding a Retry Queue
Our retry queue is going to be another topic called comments_retried
. Since most errors are related to time we will delay retry attempts by storing the current timestamp after a failed request as a metadata field.
We will use an input broker
so that we can consume both the comments
and comments_retry
topics in the same pipeline.
Our config (omitting the caching sections for brevity) now looks like this: