Enrichment Workflow
This cookbook demonstrates how to enrich a stream of JSON documents with HTTP services. This method also works with AWS Lambda functions, subprocesses, etc.
We will start off by configuring a single enrichment, then we will move onto a workflow of enrichments with a network
of dependencies using the workflow
processor.
Each enrichment will be performed in parallel across a pre-batched stream of documents. Workflow enrichments that do not depend on each other will also be performed in parallel, making this orchestration method very efficient.
The imaginary problem we are going to solve is applying a set of NLP based enrichments to a feed of articles in order to detect fake news. We will be consuming and writing to Kafka, but the example works with any input and output combination.
Articles are received over the topic articles
and look like this:
Meet the Enrichments
Claims Detector
To start us off we will configure a single enrichment, which is an imaginary ‘claims detector’ service. This is an HTTP service that wraps a trained machine learning model to extract claims that are made within a body of text.
The service expects a POST
request with JSON payload of the form:
And returns a JSON payload of the form:
Since each request only applies to a single document we will make this enrichment scale by deploying multiple HTTP services and hitting those instances in parallel across our document batches.
In order to send a mapped request and map the response back into the original document we will use the
branch
processor, with a child http
processor.
With this pipeline our documents will come out looking something like this:
Hyperbole Detector
Next up is a ‘hyperbole detector’ that takes a POST
request containing the article contents and returns a hyperbole
score between 0 and 1. This time the format is array-based and therefore supports calculating multiple documents in a
single request, making better use of the host machines GPU.
A request should take the following form:
And the response looks like this:
In order to create a single request from a batch of documents, and subsequently map the result back into our batch,
we will use the archive
and unarchive
processors in our
branch
flow, like this:
The purpose of the json_array
format archive
processor is to take a batch of JSON documents and place them into a
single document as an array. Subsequently, we then send one single request for each batch.
After the request is made we do the opposite with the unarchive
processor in order to convert it back into a batch
of the original size.
Fake News Detector
Finally, we are going to use a ‘fake news detector’ that takes the article contents as well as the output of the previous two enrichments and calculates a fake news rank between 0 and 1.
This service behaves similarly to the claims detector service and takes a document of the form:
And returns an object of the form:
We then wish to map the field fake_news_rank
from that result into the original document at the path
article.fake_news_score
. Our branch
block for this enrichment would look like this:
Note that in our request_map
we are targeting fields that are populated from the previous two enrichments.
If we were to execute all three enrichments in a sequence we’ll end up with a document looking like this:
Great! However, as a streaming pipeline this set up isn’t ideal as our first two enrichments are independent and could potentially be executed in parallel in order to reduce processing latency.
Combining into a Workflow
If we configure our enrichments within a workflow
processor we can use Wombat to automatically
detect our dependency graph, giving us two key benefits:
- Enrichments at the same level of a dependency graph (claims and hyperbole) will be executed in parallel.
- When introducing more enrichments to our pipeline the added complexity of resolving the dependency graph is handled automatically by Wombat.
Placing our branches within a workflow
processor makes our final pipeline configuration look like this:
Since the contents of tmp
won’t be required downstream we remove it after our enrichments using a
mapping
processor.
A catch
processor was added at the end of the pipeline which catches documents that failed
enrichment. You can replace the log event with a wide range of recovery actions such as sending to a dead-letter/retry
queue, dropping the message entirely, etc. You can read more about error handling in this article.