Liking cljdoc? Tell your friends :D

Aggregation & State Management

This section discusses state management and fault tolerance used in windowing/streaming joins.

Summary

Onyx provides the ability to perform updates to a state machine for segments which are calculated over windows. For example, a grouping task may accumulate incoming values for a number of keys over windows of 5 minutes. This feature is commonly used for aggregations, such as summing values, though it can be used to build more complex state machines.

State Example

;; Task definition
{:onyx/name :sum-all-ages
 :onyx/fn :clojure.core/identity
 :onyx/type :function
 :onyx/group-by-key :name
 :onyx/flux-policy :recover
 :onyx/min-peers 2
 :onyx/batch-size 20}

;; Window definition
{:window/id :sum-all-ages-window
 :window/task :sum-all-ages
 :window/type :global
 :window/aggregation [:your-sum-ns/sum :age]
 :window/doc "Adds the :age key in all segments in a global window."}

As segments are processed, an internal state within the calculated window is updated. In this case we are trying to sum the ages of the incoming segments.

The :sum-all-ages window definition referenced above, contains a :window/aggregation map. These window aggregation maps are defined by containing the following keys (see the cheat-sheet for more information):

KeyOptional?Description

:aggregation/init

true

Fn (window) to initialise the state.

:aggregation/create-state-update

false

Fn (window, segment) to generate a serializable state machine update.

:aggregation/apply-state-update

false

Fn (window, state, entry) to apply state machine update entry to a state.

:aggregation/super-aggregation-fn

true

Fn (window, state-1, state-2) to combine two states in the case of two windows being merged.

The :window/aggregation keys should map to corresponding functions. This example shows those function definitions, paired with the :window/aggregation keys, and bound to the ::sum aggregation referenced above.

(ns your-sum-ns)


(defn sum-init-fn [window]
  0)

;; Given the example input in the next section, the below segment shape (over a kafka transport) would look something like this.
;; {:serialized-key-size 36,
;;  :key "70144dea-cdd1-443d-9e7f-55cc5d0928d7",
;;  :offset 0,
;;  :serialized-value-size 22,
;;  :partition 0,
;;  :timestamp 1514680072539,
;;  :message {:age 49, :name "John"}}
(defn sum-aggregation-fn [window segment]
  (let [k (-> segment :message :age)]
    {:value k}))

;; Now just pull out the value and add it to the previous state
(defn sum-application-fn [window state value]
  (+ state (:value value)))


;; sum aggregation referenced in the window definition.
(def sum
  {:aggregation/init sum-init-fn
   :aggregation/create-state-update sum-aggregation-fn
   :aggregation/apply-state-update sum-application-fn})

Let’s try processing some example segments using this aggregation:

[{:name "John" :age 49}
 {:name "Madeline" :age 55}
 {:name "Geoffrey" :age 14}]

Results in the following events:

ActionResult

Initial state

0

Incoming segment

{:name "John" :age 49}

Changelog entry

[:set-value 49]

Applied to state

49

Incoming segment

{:name "Madeline" :age 55}

Changelog entry

[:set-value 104]

Applied to state

104

Incoming segment

{:name "Geoffrey" :age 14}

Changelog entry

[:set-value 128]

Applied to state

128

This state can be emitted via triggers or another mechanism. By describing changelog updates as a vector with a log command, such as :set-value aggregation function can emit multiple types of state transition if necessary.

Fault Tolerance

To allow for full recovery after peer crashes, the window and trigger states must be checkpointed.

A consistent snapshot is performed over the cluster every :onyx.peer/coordinator-barrier-period-ms ms. Whenever a change to the cluster allocation occurs, this state snapshot is recovered from durable storage.

Storage can be configured via the peer-config.

The ZooKeeper window storage choice should not be used in production, unless paired with a periodic call to onyx.api/gc-checkpoints.

As checkpoints will only accrete, typical production use should incorporate onyx.api/gc-checkpoints, and onyx.api/clear-checkpoints when any checkpoints will not be used to transfer state from one job to another via resume-points.

Exactly Once Side-Effects

Exactly once side-effects resulting from a segment being processed may occur, as exactly once side-effects are impossible to achieve. Onyx guarantees that a window state updates resulting from a segment are performed exactly once, however any side-effects that occur as a result of the segment being processed cannot be guaranteed to only occur once.

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close