System’s workflow a vector describing the dataflow of the system. Each component has an input and an output stream. Each
stream is a`manifold.stream`. HellHound connects io of each component to another component based on the desciption given
by the :workflow
of the system.
System’s workflow is a vector of vectors. Each vector contains two mandatory element which are:
For example, consider the following system:
Snippet 1 - A HellHound system
(ns example.system
;; We want to use defcomponenet to define the components instead of defining the maps
(:require [hellhound.system :as hh :refer [defcomponent]]
[hellhound.system :as hcomp]
[manifold.stream :as s]))
(defn dummy-start (1)
[component ctx]
(let [input (hcomp/input component)
output (hcomp/output component)])
;; Simply direct incoming data to output
(s/connect input output)
component)
(defn dummy-stop [component] component)
(def example-system
{:components [(defcomponent :component-1 dummy-start dummy-stop) (2)
(defcomponent :component-2 dummy-start dummy-stop) (3)
(defcomponent :component-3 dummy-start dummy-stop)] (4)
:workflow [[:component-1 :component-2] (5)
[:component-2 :component-3]]}) (6)
(hh/set-system! example-system) (7)
(hh/start!) (8)
1 | A dummy start function which connects input to output directly |
2 | Definition of component-1 |
3 | Definition of component-2 |
4 | Definition of component-2 |
5 | Description to connect output of component-1 to input component-2 |
6 | Description to connect output of component-2 to input component-3 |
7 | Sets example-system as the current system |
8 | Starts the current system. |
In the above snippet we defined three different components. For the sake of simplicity I skipped the start and
stop function defination of each component.
After starting the system defined in the above snippet, HellHound wires the input/output of each component
as we described in the system. So the running system workflow would be like:
[Workflow 1]
Data --> Component 1 ---> Component 2 ---> Component 3 ---> ...
As you can see in the above flow, the output of component-3
goes to no where. It’s not necessary for a component to has
both input and output connected to somewhere. Some components are responsible for side effects. For example writing to DB,
Send data to client or dispatch an event to a kafka topic. These kind of components may not have an output stream. An other
example would be those components which provide data to the pipeline by reading from a DB or from an external stream like a
kafka stream of client side event stream and so on. These components do not need an input stream to operate.
So in the example-system
we can test the workflow simply like this:
Snippet 2 - An example of a working workflow
(let [input1 (hcomp/input (hh/get-component :component-1))
output3 (hcomp/output (hh/get-component :component-3))]
(s/put! input1 42) (1)
(println "output: " @(s/take! output3))) ;; ==> output: 42 (2)
1 | Put number 42 into the pipeline enterance point |
2 | Take the output from the output point of the pipeline |
Since we we connected our pipeline as described in workflow 1, The value which we put into input stream of component-1
,
retreived from the output stream of component-3
.
|
Don’t forget that our components simply connect their input stream to their output stream
|
Now let’s change our workflow to create a tree like pipeline. Now let’s change the :workflow
of example-system
as follow:
:workflow [[:component-1 :component-2]
[:component-1 :component-3]]
Now with this new workflow, our dataflow would be like:
[Workflow 2]
|---> Component 2 ---> ...
Data --> Component 1 --> |
|---> Component 3 ---> ...
So both of component-2
and component-3
would get their input from component-1
. Any input comes to component-1
would be available
separately in the output of component-2
and component-3
and taking that value from the output of component-2
would not affect the
output of component-3
.
Sometimes, we need to dispatch values to a component conditionally. For example imagin a system that is responsible for separeting odd numbers
from even numbers in a stream of numbers. Checkout the following workflow
definition:
:workflow [[:component-1 odd? :component-2]
[:component-1 even? :component-3]]
As you can see in the workflow definition above, it’s possible to describe a condition for dispatching values from a component to
another one. In the previous example all the values from :component-1
flow to :component-2
only if the provided condition
returns true for each incoming value which in this case only odd
values are going to deliver to component-2
. The same applies to
the second pipe but only even
values flow to component-3
.
The predicate function can be any function which receives an argument ( The value from upstream component ) and returns a boolean value
indicating whether the value should flow to the downstream component or not.
Base on what we discussed up until now we can test our workflow like this:
Snippet 3 - Testing conditional workflow
;; I skipped the ns declaration. It would be exactly like Snippet 1 in this section
(let [input1 (hcomp/input (hh/get-component :component-1))
output2 (hcomp/output2 (hh/get-component :component-2))
output3 (hcomp/output2 (hh/get-component :component-3))]
(s/consume #(println "Odd: " %) output2) (1)
(s/consume #(println "Even: " %) output3) (2)
(-> [1 2 3 4 5]
(s/->source) (3)
(s/connect input1))) (4)
1 | Adds a consumer function for the output stream of :component-2 |
2 | Adds a consumer function for the output stream of :component-3 |
3 | Converts the [1 2 3 4 5] vector to a stream source |
4 | Connects the source stream resulted in step <3> to input of :component-1 |
The output of the above snippet would be like:
Odd: 1
Even: 2
Odd: 3
Even: 4
Odd: 5
|
Predicate functions should be pure
Predicate functions in each pipe should be pure and free of side effects. These functions should
be as fast as possible because HellHound calls them rapidly for each value in the pipe.
|
<PLACEHOLDER TEXT> Explaination about predicate best practices and hellhound.messaging
ns