(defn my-inc [{:keys [n] :as segment}]
(assoc segment :n (inc n)))
This section outlines how Onyx programs execute behavior. Onyx uses plain Clojure functions to carry out distributed activity. You have the option of performing grouping and aggregation on each function.
A Function is a construct that takes a segment as a parameter and outputs a segment or a seq of segments. Functions are meant to literally transform a single unit of data in a functional manner. The following is an example of a function:
(defn my-inc [{:keys [n] :as segment}]
(assoc segment :n (inc n)))
Note that you may only pass segments between functions - no other shape of data is allowed.
Example project: filtering |
A function can be parameterized before a job is submitted to Onyx. The segment is always the last argument to the function. There are multiple ways to paramerize a function, and they can be used in combination.
Via the catalog :onyx/params
entry
(def catalog
{...
:my/param-1 "abc"
:my/param-2 "def"
:onyx/params [:my/param-1 :my/param-2]
...}
The function is then invoked with (partial f "abc" "def")
. The order
is controlled by the vector of :onyx/params
.
Via :onyx.core/params
in the before-task-start
lifecycle hook
(defn before-task-start-hook [event lifecycle]
{:onyx.core/params [42]})
The function is then invoked with (partial f 42)
.
Using this approach "hard sets" the parameters list. Other parameters
may already exist in onyx.core/params
. If you want to retain those
parameter, concat them together and return the new value on
onyx.core/params
.
Via the :onyx.peer/fn-params
peer configuration
(def peer-opts
{...
:onyx.peer/fn-params {:my-fn-name [64]}})
The function is then invoked with (partial f 64)
.
This approach is useful for parameterizing a task regardless of which
job it is in. If both onyx.peer/fn-params
and :onyx/params
are set
for the same task, they are concatenated together, with fn-params
coming first.
Example projects: parameterized, interface-injection, catalog-parameters |
Grouping ensures that "like" values are always routed to the same
virtual peer, presumably to compute an aggregate. Grouping is specified
inside of a catalog entry. There are two ways to group: by key of
segment, or by arbitrary function. Grouping by key is a convenience that
will reach into each segment and pin all segments with the same key
value in the segment together. Grouping functions receive a single
segment as input. The output of a grouping function is the value to
group on. Grouped functions must set keys :onyx/min-peers
and
:onyx/flux-policy
. See below for a description of these.
To group by a key or a vector of keys in a segment, use
:onyx/group-by-key
in the catalog entry:
{:onyx/name :sum-balance
:onyx/fn :onyx.peer.kw-grouping-test/sum-balance
:onyx/type :function
:onyx/group-by-key :name
:onyx/min-peers 3
:onyx/flux-policy :continue
:onyx/batch-size 1000}
To group by an arbitrary function, use :onyx/group-by-fn
in the
catalog entry:
{:onyx/name :sum-balance
:onyx/fn :onyx.peer.fn-grouping-test/sum-balance
:onyx/type :function
:onyx/group-by-fn :onyx.peer.fn-grouping-test/group-by-name
:onyx/min-peers 3
:onyx/flux-policy :continue
:onyx/batch-size 1000}
Functions that use the grouping feature are presumably stateful. For
this reason, unless :continue
is used, once a job begins, no matter
how many peers are added to the cluster, no new peers will be allocated
to grouping tasks. When more peers are added after the job begins, the
hashing algorithm loses its consistency, and stateful operations won’t
work correctly.
Given the fact the Onyx will not add more peers to regular grouping
tasks after it begins, we introduce a new parameter - :onyx/min-peers
.
This should be set to an integer that indicates the minimum number of
peers that will be allocated to this task before the job can begin. Onyx
may schedule more than the minimum number that you set. You can create
an upper bound by also using :onyx/max-peers
.
Example project: max-peers. |
One concern that immediately needs to be handled is addressing what happens if a peer on a grouping task leaves the cluster after the job has begun? Clearly, removing a peer from a grouping task also breaks the consistent hashing algorithm that supports statefulness. The policy that is enforced is configurable, and must be chosen by the developer. We offer three policies, outlined below.
When :onyx/flux-policy
is set to :continue
on a catalog entry, the
hashing algorithm may be inconsistent. Peers can leave or join a task at
any point in time. This is desirable for streaming jobs where the data
is theoretically infinite or have tasks that benefit from grouping but
are not stateful.
When :onyx/flux-policy
is set to :kill
, the job is killed and all
peers abort execution of the job. Some jobs cannot compute correct
answers if there is a shift in the hashing algorithm’s consistency. An
example of this is a word count batch job.
When :onyx/flux-policy
is set to :recover
, the job continues as
is if any peers abort execution of the task. If any other peers are
available, they will be added to this task to progressively meet the
:onyx/min-peers
number of peers concurrently working on this task.
Sometimes you might be able to perform a function more efficiently over
a batch of segments rather than processing one segment at a time, such
as writing segments to a database in a non-output task. You can receive
the entire batch of segments as an argument to your task by
setting :onyx/batch-fn?
to true
in your catalog entry for your function.
Your function must return a sequence with the same number of elements
as its incoming batch has. The elements are then matched up positionally to
pair parent segments with their outgoing child segments. Elements in the output
may either be a single segment or a vector of segments, as normal.
The utility of this feature is you can use functions that are more efficient over
a large number of segments rather than one at a time.
An example catalog entry:
{:onyx/name :inc
:onyx/fn :onyx.peer.batch-function-test/my-inc
:onyx/type :function
:onyx/batch-fn? true
:onyx/batch-size batch-size}
And an example catalog function to correspond to this entry:
(defn my-inc [segments]
(map #(update-in % [:n] inc) segments))
The default value for this option is false
.
Sometimes you’re going to want a node in your workflow with no outgoing
connections that doesn’t perform I/O against a database. You can do this
by setting :onyx/type
to :output
, :onyx/medium
to :function
, and
:onyx/plugin
to onyx.peer.function/function
. Then you can specify an
:onyx/fn
pointing to a regular Clojure function. For example:
{:onyx/name :leaf-task
:onyx/fn ::add-to-results
:onyx/plugin :onyx.peer.function/function
:onyx/medium :function
:onyx/type :output
:onyx/batch-size 20}
Can you improve this documentation? These fine people already did:
vijaykiran, Yonatan Elhanan & Michael DrogalisEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close