Liking cljdoc? Tell your friends :D

Lifecycles

Lifecycles are a feature that allow you to control code that executes at particular points during task execution on each peer. Lifecycles are data driven and composable.

Summary

There are several interesting points to execute arbitrary code during a task in Onyx. Onyx lets you plug in and calls functions before a task, after a task, before a batch, and after a batch on every peer. Additionally, there is another lifecycle hook that allows you to delay starting a task in case you need to do some work like acquiring a lock under contention. A peer’s lifecycle is isolated to itself, and lifecycles never coordinate across peers. Usage of lifecycles are entirely optional. Lifecycle data is submitted as a data structure at job submission time.

Lifecycle Phases

Before task set up

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a boolean value indicating whether to start the task or not. If false, the process backs off for a preconfigured amount of time and calls this task again. Useful for lock acquisition. This function is called prior to any processes inside the task becoming active.

Before task execution

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called after processes in the task are launched, but before the peer listens for incoming segments from other peers.

Before batch start

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called prior to receiving a batch of segments from the reading function.

After reading batch of segments

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called immediately after a batch of segments has been read by the peer. The segments are available in the event map by the key :onyx.core/batch.

After apply fn batch

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called after the :onyx/fn has been applied to the input segments and created output segments.

After batch

A function that takes two arguments - an event map, and the matching lifecycle map. Must return a map that is merged back into the original event map. This function is called before the peer relinquishes its task. No more segments will be received.

Handle Exception

If an exception is thrown during any lifecycle execution except after-task-stop, one or more lifecycle handlers may be defined. If present, the exception will be caught and passed to this function. See the details on the Onyx cheat sheet.

Example

Let’s work with an example to show how lifecycles work. Suppose you want to print out a message at all the possible lifecycle hooks. You’d start by defining 9 functions for the 9 hooks:

(ns my.ns)

(defn start-task? [event lifecycle]
  (println "Executing once before the task starts.")
  true)

(defn before-task-start [event lifecycle]
  (println "Executing once before the task starts.")
  {})

(defn after-task-stop [event lifecycle]
  (println "Executing once after the task is over.")
  {})

(defn before-batch [event lifecycle]
  (println "Executing once before each batch.")
  {})

(defn after-read-batch [event lifecycle]
  (println "Executing once after this batch has been read.")
  {})

(defn after-apply-fn [event lifecycle]
  (println "Executing once after the onyx/fn has been called on the input segments.")
  {})

(defn after-batch [event lifecycle]
  (println "Executing once after each batch.")
  {})

(defn handle-exception [event lifecycle lifecycle-phase e]
  (println "Caught exception: " e)
  (println "Returning :restart, indicating that this task should restart.")
  :restart)

Notice that all lifecycle functions return maps except start-task?. This map is merged back into the event parameter that you received. start-task? is a boolean function that allows you to block and back off if you don’t want to start the task quite yet. This function will be called periodically as long as false is returned. If more than one start-task? is specified in your lifecycles, they must all return true for the task to begin. start-task? is invoked before before-task-start.

Next, define a map that wires all these functions together by mapping predefined keywords to the functions:

(def calls
  {:lifecycle/start-task? start-task?
   :lifecycle/before-task-start before-task-start
   :lifecycle/before-batch before-batch
   :lifecycle/after-read-batch after-read-batch
   :lifecycle/after-apply-fn after-apply-fn
   :lifecycle/after-batch after-batch
   :lifecycle/after-task-stop after-task-stop
   :lifecycle/handle-exception handle-exception})

Each of these 9 keys maps to a function. All of these keys are optional, so you can mix and match depending on which functions you actually need to use.

Finally, create a lifecycle data structure by pointing :lifecycle/calls to the fully qualified namespaced keyword that represents the calls map that we just defined. Pass it to your onyx.api/submit-job call:

(def lifecycles
  [{:lifecycle/task :my-task-name-here
    :lifecycle/calls :my.ns/calls
    :lifecycle/doc "Test lifecycles and print a message at each stage"}])

(onyx.api/submit-job
  peer-config
  {
  ...
  :lifecycles lifecycles
  ...
  }

It is also possible to have a lifecycle apply to every task in a workflow by specifying :lifecycle/task :all. This is useful for instrumenting your tasks with metrics, error handling, or debugging information.

(def lifecycles
  [{:lifecycle/task :all
    :lifecycle/calls :my.ns/add-metrics
    :lifecycle/doc "Instruments all tasks in a workflow with the example function 'add-metrics'"}])

You can supply as many sets of lifecycles as you want. They are invoked in the order that they are supplied in the vector, giving you a predictable sequence of calls. Be sure that all the keyword symbols and functions are required onto the classpath for the peer that will be executing them.

Example project: lifecycles

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close