Lifecycles are a feature that allow you to control code that executes at
particular points during task execution on each peer. Lifecycles are
data driven and composable.
There are several interesting points to execute arbitrary code during a
task in Onyx. Onyx lets you plug in and calls functions before a task,
after a task, before a batch, and after a batch on every peer.
Additionally, there is another lifecycle hook that allows you to delay
starting a task in case you need to do some work like acquiring a lock
under contention. A peer’s lifecycle is isolated to itself, and
lifecycles never coordinate across peers. Usage of lifecycles are
entirely optional. Lifecycle data is submitted as a data structure at
job submission time.
A function that takes two arguments - an event map, and the matching
lifecycle map. Must return a boolean value indicating whether to start
the task or not. If false, the process backs off for a preconfigured
amount of time and calls this task again. Useful for lock acquisition.
This function is called prior to any processes inside the task becoming
active.
A function that takes two arguments - an event map, and the matching
lifecycle map. Must return a map that is merged back into the original
event map. This function is called after processes in the task are
launched, but before the peer listens for incoming segments from other
peers.
A function that takes two arguments - an event map, and the matching
lifecycle map. Must return a map that is merged back into the original
event map. This function is called prior to receiving a batch of
segments from the reading function.
A function that takes two arguments - an event map, and the matching
lifecycle map. Must return a map that is merged back into the original
event map. This function is called immediately after a batch of segments
has been read by the peer. The segments are available in the event map
by the key :onyx.core/batch.
A function that takes two arguments - an event map, and the matching
lifecycle map. Must return a map that is merged back into the original
event map. This function is called after the :onyx/fn has been applied to the
input segments and created output segments.
A function that takes two arguments - an event map, and the matching
lifecycle map. Must return a map that is merged back into the original
event map. This function is called before the peer relinquishes its
task. No more segments will be received.
If an exception is thrown during any lifecycle execution except
after-task-stop, one or more lifecycle handlers may be defined. If
present, the exception will be caught and passed to this function. See
the details on the
Onyx
cheat sheet.
Let’s work with an example to show how lifecycles work. Suppose you want
to print out a message at all the possible lifecycle hooks. You’d start
by defining 9 functions for the 9 hooks:
(ns my.ns)
(defnstart-task? [event lifecycle]
(println"Executing once before the task starts.")
true)
(defnbefore-task-start [event lifecycle]
(println"Executing once before the task starts.")
{})
(defnafter-task-stop [event lifecycle]
(println"Executing once after the task is over.")
{})
(defnbefore-batch [event lifecycle]
(println"Executing once before each batch.")
{})
(defnafter-read-batch [event lifecycle]
(println"Executing once after this batch has been read.")
{})
(defnafter-apply-fn [event lifecycle]
(println"Executing once after the onyx/fn has been called on the input segments.")
{})
(defnafter-batch [event lifecycle]
(println"Executing once after each batch.")
{})
(defnhandle-exception [event lifecycle lifecycle-phase e]
(println"Caught exception: " e)
(println"Returning :restart, indicating that this task should restart.")
:restart)
Notice that all lifecycle functions return maps except start-task?.
This map is merged back into the event parameter that you received.
start-task? is a boolean function that allows you to block and back
off if you don’t want to start the task quite yet. This function will be
called periodically as long as false is returned. If more than one
start-task? is specified in your lifecycles, they must all return
true for the task to begin. start-task? is invoked beforebefore-task-start.
Next, define a map that wires all these functions together by mapping
predefined keywords to the functions:
Each of these 9 keys maps to a function. All of these keys are optional,
so you can mix and match depending on which functions you actually need
to use.
Finally, create a lifecycle data structure by pointing
:lifecycle/calls to the fully qualified namespaced keyword that
represents the calls map that we just defined. Pass it to your
onyx.api/submit-job call:
(deflifecycles
[{:lifecycle/task:my-task-name-here:lifecycle/calls:my.ns/calls:lifecycle/doc"Test lifecycles and print a message at each stage"}])
(onyx.api/submit-job
peer-config
{
...
:lifecycles lifecycles
...
}
It is also possible to have a lifecycle apply to every task in a
workflow by specifying :lifecycle/task :all. This is useful for
instrumenting your tasks with metrics, error handling, or debugging
information.
(deflifecycles
[{:lifecycle/task:all:lifecycle/calls:my.ns/add-metrics:lifecycle/doc"Instruments all tasks in a workflow with the example function 'add-metrics'"}])
You can supply as many sets of lifecycles as you want. They are invoked
in the order that they are supplied in the vector, giving you a
predictable sequence of calls. Be sure that all the keyword symbols and
functions are required onto the classpath for the peer that will be
executing them.