Liking cljdoc? Tell your friends :D

Triggers

In this section, we talk about Triggers. Triggers are a feature that interact with Windows. Windows capture and bucket data over time. Triggers let you release the captured data over a variety stimuli.

Summary

Windows capture data over time and place segments into discrete, possibly overlapping buckets. By itself, this is a relatively useless concept. In order to harness the information that has been captured and rolled up, we need to move it somewhere. Triggers let us interact with the state in each extent of a window.

Example project: aggregation

Trigger Types

Onyx ships a number of trigger implementations that can be used out of the box. Each trigger fires in response to a particular stimulus. All triggers implemented in Onyx core fire at task completion. We outline each here and show an example of each in action.

:timer

This trigger sleeps for a duration of :trigger/period. When it is done sleeping, the :trigger/sync function is invoked with its usual arguments. The trigger goes back to sleep and repeats itself.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/timer
 :trigger/period [3 :seconds]
 :trigger/sync ::write-to-dynamo
 :trigger/doc "Writes state to DynamoDB every 5 seconds, discarding intermediate state"}

:segment

Trigger wakes up in reaction to a new segment being processed. Trigger only fires once every :trigger/threshold segments. When the threshold is exceeded, the count of new segments goes back to 0, and the looping proceeds again in the same manner.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/segment
 :trigger/fire-all-extents? true
 :trigger/threshold [5 :elements]
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to stdout every 5 segments"}

:punctuation

Trigger wakes up in reaction to a new segment being processed. Trigger only fires if :trigger/pred evaluates to true. The signature of :trigger/pred is of arity-2: trigger, state-event. Punctuation triggers are often useful to send signals through that indicate that no more data will be coming through for a particular window of time.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/punctuation
 :trigger/pred ::trigger-pred
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to std out if :trigger/pred is true for this segment"}

:watermark

Trigger wakes up in reaction to a new segment being processed. Trigger only fires if the value of :window/window-key in the segment exceeds the upper-bound in the extent of an active window. This is a shortcut function for a punctuation trigger that fires when any piece of data has a time-based window key that is above another extent, effectively declaring that no more data for earlier windows will be arriving.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/watermark
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to stdout when this window's watermark has been exceeded"}

:percentile-watermark

Trigger wakes up in reaction to a new segment being processed. Trigger only fires if the value of :window/window-key in the segment exceeds the lower-bound plus the percentage of the range as indicated by :trigger/watermark-percentage, a double greater than 0 and less than 1. This is an alternative to :watermark that allows you to trigger on most of the data arriving, not necessarily every last bit.

{:trigger/window-id :collect-segments
 :trigger/on :onyx.triggers/percentile-watermark
 :trigger/watermark-percentage 0.95
 :trigger/sync ::write-to-stdout
 :trigger/doc "Writes the window contents to stdout when this window's watermark is exceeded by 95% of its range"}

Refinement Modes

A refinement mode allows you to articulate what should happen to the state of a window extent after a trigger has been invoked.

A refinement must implement the following interface:

(defn create-state-update [trigger state state-event]
;; create a state machine update from the window state and the event
)

(defn apply-state-update [trigger state entry]
;; apply that update to the window state after the trigger has been called
)

(def my-refinement
  {:refinement/create-state-update create-state-update
   :refinement/apply-state-update apply-state-update})

Syncing

Onyx offers you the ultimate flexibility on what to do with your state during a trigger invocation. Set :trigger/sync to a fully qualified, namespaced keyword pointing to a function on the classpath at runtime. This function takes 5 arguments: The event map, the window map that this trigger is defined on, the trigger map, a state-event map, and the window state as an immutable value. Its return value is ignored.

This function is invoked when the trigger fires, and is used to do any arbitrary action with the window contents, such as sync them to a database. It is called once per window instance. In other words, if a fixed window exists with 5 instances, the firing of a Timer trigger will call the sync function 5 times. You can use lifecycles to supply any stateful connections necessary to sync your data. Supplied values from lifecycles will be available through the first parameter - the event map.

Trigger Specification

See the Information Model chapter for an exact specification of what values the Trigger maps need to supply. Here we will describe what each of the keys mean.

key namedescription

:trigger/id

An id for the trigger that is unique over the window that it is placed on. As of 0.10.0 :trigger/id`s are required. `:trigger/id is useful for resuming trigger state via resume points.

:trigger/window-id

A :window/id specified in the collection of windows.

:trigger/refinement

Fully qualified namespaced keyword for the mode of refinement.

:trigger/on

Fully qualified namespaced keyword for the trigger called to determine whether to fire as a reaction e.g. :onyx.triggers/segment.

:trigger/sync

Fully qualified namespaced keyword of a function to call with the state.

:trigger/emit

A fully qualified, namespaced keyword pointing to a function on the classpath at runtime. This function takes 5 arguments: the event map, the window map that this trigger is defined on, the trigger map, a state-event map, and the window state as an immutable value. It must return a segment, or vector of segments, which will flow downstream.

:trigger/fire-all-extents?

When true, fires every extent of a window in response to a trigger.

:trigger/doc

An optional docstring explaining the trigger’s purpose.

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close