Streams Via Config Files
When running Wombat in streams
mode it’s possible to create streams with their own static configurations, simply
list one or more files after the streams
subcommand:
wombat streams ./foo.yaml ./configs/*.yaml
Resources
A stream configuration should only include the base stream component fields (input
, buffer
, pipeline
, output
),
and therefore should NOT include any resources. Instead, define resources separately and import them
using the -r
/--resources
flag:
wombat -r "./resources/prod/*.yaml" streams ./stream_configs/*.yaml
Walkthrough
Make a directory of stream configs:
$ mkdir ./streams
$ cat > ./streams/foo.yaml <<EOFinput: http_server: {}pipeline: threads: 4 processors: - mapping: 'root = {"id": this.user.id, "content": this.body.content}'output: http_server: {}EOF
$ cat > ./streams/bar.yaml <<EOFinput: kafka: addresses: - localhost:9092 topics: - my_topicpipeline: threads: 1 processors: - mapping: 'root = this.uppercase()'output: elasticsearch: urls: - http://localhost:9200EOF
Run Wombat in streams mode, pointing to our directory of streams:
$ wombat streams ./streams/*.yaml
On a separate terminal you can query the set of streams loaded:
$ curl http://localhost:4195/streams | jq '.'{ "bar": { "active": true, "uptime": 19.381001424, "uptime_str": "19.381001552s" }, "foo": { "active": true, "uptime": 19.380582951, "uptime_str": "19.380583306s" }}
You can also query a specific stream to see the loaded configuration:
$ curl http://localhost:4195/streams/foo | jq '.'{ "active": true, "uptime": 69.334717193, "uptime_str": "1m9.334717193s", "config": { "input": { "http_server": { "address": "", "cert_file": "", "key_file": "", "path": "/post", "timeout": "5s" } }, "buffer": { "memory": { "limit": 10000000 } }, "pipeline": { "processors": [ { "mapping": "root = {\"id\": this.user.id, \"content\": this.body.content}", } ], "threads": 4 }, "output": { "http_server": { "address": "", "cert_file": "", "key_file": "", "path": "/get", "stream_path": "/get/stream", "timeout": "5s" } } }}
You can then send data to the stream via its namespaced URL:
$ curl http://localhost:4195/foo/post -d '{"user":{"id":"foo"},"body":{"content":"bar"}}'
There are other endpoints in the REST API for creating, updating and deleting streams.