Liking cljdoc? Tell your friends :D

Functions

This section outlines how Onyx programs execute behavior. Onyx uses plain Clojure functions to carry out distributed activity. You have the option of performing grouping and aggregation on each function.

Functional Transformation

A Function is a construct that takes a segment as a parameter and outputs a segment or a seq of segments. Functions are meant to literally transform a single unit of data in a functional manner. The following is an example of a function:

(defn my-inc [{:keys [n] :as segment}]
  (assoc segment :n (inc n)))

Note that you may only pass segments between functions - no other shape of data is allowed.

Example project: filtering

Function Parameterization

A function can be parameterized before a job is submitted to Onyx. The segment is always the last argument to the function. There are multiple ways to paramerize a function, and they can be used in combination.

  • Via the catalog :onyx/params entry

(def catalog
{...
 :my/param-1 "abc"
 :my/param-2 "def"
 :onyx/params [:my/param-1 :my/param-2]
 ...}

The function is then invoked with (partial f "abc" "def"). The order is controlled by the vector of :onyx/params.

  • Via :onyx.core/params in the before-task-start lifecycle hook

(defn before-task-start-hook [event lifecycle]
  {:onyx.core/params [42]})

The function is then invoked with (partial f 42).

Using this approach "hard sets" the parameters list. Other parameters may already exist in onyx.core/params. If you want to retain those parameter, concat them together and return the new value on onyx.core/params.

  • Via the :onyx.peer/fn-params peer configuration

(def peer-opts
  {...
   :onyx.peer/fn-params {:my-fn-name [64]}})

The function is then invoked with (partial f 64).

This approach is useful for parameterizing a task regardless of which job it is in. If both onyx.peer/fn-params and :onyx/params are set for the same task, they are concatenated together, with fn-params coming first.

Grouping & Aggregation

Grouping ensures that "like" values are always routed to the same virtual peer, presumably to compute an aggregate. Grouping is specified inside of a catalog entry. There are two ways to group: by key of segment, or by arbitrary function. Grouping by key is a convenience that will reach into each segment and pin all segments with the same key value in the segment together. Grouping functions receive a single segment as input. The output of a grouping function is the value to group on. Grouped functions must set keys :onyx/min-peers and :onyx/flux-policy. See below for a description of these.

Group By Key

To group by a key or a vector of keys in a segment, use :onyx/group-by-key in the catalog entry:

{:onyx/name :sum-balance
 :onyx/fn :onyx.peer.kw-grouping-test/sum-balance
 :onyx/type :function
 :onyx/group-by-key :name
 :onyx/min-peers 3
 :onyx/flux-policy :continue
 :onyx/batch-size 1000}

Group By Function

To group by an arbitrary function, use :onyx/group-by-fn in the catalog entry:

{:onyx/name :sum-balance
 :onyx/fn :onyx.peer.fn-grouping-test/sum-balance
 :onyx/type :function
 :onyx/group-by-fn :onyx.peer.fn-grouping-test/group-by-name
 :onyx/min-peers 3
 :onyx/flux-policy :continue
 :onyx/batch-size 1000}

Flux Policies

Functions that use the grouping feature are presumably stateful. For this reason, unless :continue is used, once a job begins, no matter how many peers are added to the cluster, no new peers will be allocated to grouping tasks. When more peers are added after the job begins, the hashing algorithm loses its consistency, and stateful operations won’t work correctly.

Given the fact the Onyx will not add more peers to regular grouping tasks after it begins, we introduce a new parameter - :onyx/min-peers. This should be set to an integer that indicates the minimum number of peers that will be allocated to this task before the job can begin. Onyx may schedule more than the minimum number that you set. You can create an upper bound by also using :onyx/max-peers.

Example project: max-peers.

One concern that immediately needs to be handled is addressing what happens if a peer on a grouping task leaves the cluster after the job has begun? Clearly, removing a peer from a grouping task also breaks the consistent hashing algorithm that supports statefulness. The policy that is enforced is configurable, and must be chosen by the developer. We offer three policies, outlined below.

Continue Policy

When :onyx/flux-policy is set to :continue on a catalog entry, the hashing algorithm may be inconsistent. Peers can leave or join a task at any point in time. This is desirable for streaming jobs where the data is theoretically infinite or have tasks that benefit from grouping but are not stateful.

Kill Policy

When :onyx/flux-policy is set to :kill, the job is killed and all peers abort execution of the job. Some jobs cannot compute correct answers if there is a shift in the hashing algorithm’s consistency. An example of this is a word count batch job.

Recover Policy

When :onyx/flux-policy is set to :recover, the job continues as is if any peers abort execution of the task. If any other peers are available, they will be added to this task to progressively meet the :onyx/min-peers number of peers concurrently working on this task.

Batch Functions

Sometimes you might be able to perform a function more efficiently over a batch of segments rather than processing one segment at a time, such as writing segments to a database in a non-output task. You can receive the entire batch of segments as an argument to your task by setting :onyx/batch-fn? to true in your catalog entry for your function. Your function must return a sequence with the same number of elements as its incoming batch has. The elements are then matched up positionally to pair parent segments with their outgoing child segments. Elements in the output may either be a single segment or a vector of segments, as normal. The utility of this feature is you can use functions that are more efficient over a large number of segments rather than one at a time.

An example catalog entry:

{:onyx/name :inc
 :onyx/fn :onyx.peer.batch-function-test/my-inc
 :onyx/type :function
 :onyx/batch-fn? true
 :onyx/batch-size batch-size}

And an example catalog function to correspond to this entry:

(defn my-inc [segments]
  (map #(update-in % [:n] inc) segments))

The default value for this option is false.

Leaf Functions

Sometimes you’re going to want a node in your workflow with no outgoing connections that doesn’t perform I/O against a database. You can do this by setting :onyx/type to :output, :onyx/medium to :function, and :onyx/plugin to onyx.peer.function/function. Then you can specify an :onyx/fn pointing to a regular Clojure function. For example:

{:onyx/name :leaf-task
 :onyx/fn ::add-to-results
 :onyx/plugin :onyx.peer.function/function
 :onyx/medium :function
 :onyx/type :output
 :onyx/batch-size 20}

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close