{:trigger/window-id :collect-segments
:trigger/on :onyx.triggers/timer
:trigger/period [3 :seconds]
:trigger/sync ::write-to-dynamo
:trigger/doc "Writes state to DynamoDB every 5 seconds, discarding intermediate state"}
In this section, we talk about Triggers. Triggers are a feature that interact with Windows. Windows capture and bucket data over time. Triggers let you release the captured data over a variety stimuli.
Windows capture data over time and place segments into discrete, possibly overlapping buckets. By itself, this is a relatively useless concept. In order to harness the information that has been captured and rolled up, we need to move it somewhere. Triggers let us interact with the state in each extent of a window.
Example project: aggregation |
Onyx ships a number of trigger implementations that can be used out of the box. Each trigger fires in response to a particular stimulus. All triggers implemented in Onyx core fire at task completion. We outline each here and show an example of each in action.
:timer
This trigger sleeps for a duration of :trigger/period
. When it is done
sleeping, the :trigger/sync
function is invoked with its usual
arguments. The trigger goes back to sleep and repeats itself.
{:trigger/window-id :collect-segments
:trigger/on :onyx.triggers/timer
:trigger/period [3 :seconds]
:trigger/sync ::write-to-dynamo
:trigger/doc "Writes state to DynamoDB every 5 seconds, discarding intermediate state"}
:segment
Trigger wakes up in reaction to a new segment being processed. Trigger
only fires once every :trigger/threshold
segments. When the threshold
is exceeded, the count of new segments goes back to 0
, and the looping
proceeds again in the same manner.
{:trigger/window-id :collect-segments
:trigger/on :onyx.triggers/segment
:trigger/fire-all-extents? true
:trigger/threshold [5 :elements]
:trigger/sync ::write-to-stdout
:trigger/doc "Writes the window contents to stdout every 5 segments"}
:punctuation
Trigger wakes up in reaction to a new segment being processed. Trigger
only fires if :trigger/pred
evaluates to true
. The signature of
:trigger/pred
is of arity-2: trigger, state-event
. Punctuation
triggers are often useful to send signals through that indicate that
no more data will be coming through for a particular window of time.
{:trigger/window-id :collect-segments
:trigger/on :onyx.triggers/punctuation
:trigger/pred ::trigger-pred
:trigger/sync ::write-to-stdout
:trigger/doc "Writes the window contents to std out if :trigger/pred is true for this segment"}
:watermark
Trigger wakes up in reaction to a new segment being processed. Trigger
only fires if the value of :window/window-key
in the segment exceeds
the upper-bound in the extent of an active window. This is a shortcut
function for a punctuation trigger that fires when any piece of data
has a time-based window key that is above another extent, effectively
declaring that no more data for earlier windows will be arriving.
{:trigger/window-id :collect-segments
:trigger/on :onyx.triggers/watermark
:trigger/sync ::write-to-stdout
:trigger/doc "Writes the window contents to stdout when this window's watermark has been exceeded"}
:percentile-watermark
Trigger wakes up in reaction to a new segment being processed. Trigger
only fires if the value of :window/window-key
in the segment exceeds
the lower-bound plus the percentage of the range as indicated by
:trigger/watermark-percentage
, a double
greater than 0
and less
than 1
. This is an alternative to :watermark
that allows you to
trigger on most of the data arriving, not necessarily every last
bit.
{:trigger/window-id :collect-segments
:trigger/on :onyx.triggers/percentile-watermark
:trigger/watermark-percentage 0.95
:trigger/sync ::write-to-stdout
:trigger/doc "Writes the window contents to stdout when this window's watermark is exceeded by 95% of its range"}
A refinement mode allows you to articulate what should happen to the state of a window extent after a trigger has been invoked.
A refinement must implement the following interface:
(defn create-state-update [trigger state state-event]
;; create a state machine update from the window state and the event
)
(defn apply-state-update [trigger state entry]
;; apply that update to the window state after the trigger has been called
)
(def my-refinement
{:refinement/create-state-update create-state-update
:refinement/apply-state-update apply-state-update})
Onyx offers you the ultimate flexibility on what to do with your state
during a trigger invocation. Set :trigger/sync
to a fully qualified,
namespaced keyword pointing to a function on the classpath at runtime.
This function takes 5 arguments: The
event
map, the
window
map that this trigger is defined on, the
trigger
map, a
state-event
map, and the window state as an immutable value. Its return value is
ignored.
This function is invoked when the trigger fires, and is used to do any arbitrary action with the window contents, such as sync them to a database. It is called once per window instance. In other words, if a fixed window exists with 5 instances, the firing of a Timer trigger will call the sync function 5 times. You can use lifecycles to supply any stateful connections necessary to sync your data. Supplied values from lifecycles will be available through the first parameter - the event map.
See the Information Model chapter for an exact specification of what values the Trigger maps need to supply. Here we will describe what each of the keys mean.
key name | description |
---|---|
| An id for the trigger that is unique over the window that it is placed on. As of 0.10.0 |
| A |
| Fully qualified namespaced keyword for the mode of refinement. |
| Fully qualified namespaced keyword for the trigger
called to determine whether to fire as a reaction e.g.
|
| Fully qualified namespaced keyword of a function to call with the state. |
| A fully qualified, namespaced keyword pointing to a function on the classpath at runtime. This function takes 5 arguments: the event map, the window map that this trigger is defined on, the trigger map, a state-event map, and the window state as an immutable value. It must return a segment, or vector of segments, which will flow downstream. |
| When true, fires every extent of a window in response to a trigger. |
| An optional docstring explaining the trigger’s purpose. |
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close