In this section, we talk about Watermarks. Watermarks are a feature that interacts with Windows and Triggers. Watermarks give you a way to use windows in a way that can relate to the event time/ingest time of the data that is being processed.
Watermarks can be used in two ways:
Via :onyx/assign-watermark-fn
on the input task, which will be applied to the segments read from an input task, and must return the number of milliseconds since epoch, or nil if none exists for that segment.
Via native support in your onyx input plugin, provided by implementing onyx.plugin.protocols/WatermarkedInput
protocol. Only onyx-kafka plugin is currently supported, however more plugin support is being worked on.
Watermarks work as follows:
The input sources read a message with a watermarked time, if that time is later than the current watermark update the watermark.
The minimum watermark is computed across all of the input tasks and their peers, this is the new watermark for the inputs.
The watermark flows through the DAG. The minimum watermark read by each a task’s edges is used the current watermark for that task.
As the minimum watermark increases for a task, all triggers are called with a state-event, where the event-type is :watermark
, and where the state-event includes a :watermarks
map, containing the current watermarks.
The watermarks map is intended to be used by the trigger to decide whether to fire for a given window.
The current minimum watermark for a task flows down to its downstream tasks.
The overall effect is that if you have two input tasks, that are offset in event-time, it is is easy to ensure that no windows are processed until both tasks have passed a given time.
The easiest use of watermarks is via the :onyx.triggers/watermark
trigger, which only fires when the upper-bound of a window is passed, with an optional delay :trigger/delay
.
Can you improve this documentation? These fine people already did:
Lucas Bradstreet & Laszlo (@laczoka on Twitter)Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close