flux is a workflow scripting framework that can be used to model business
application processes. The framework provides 4 basic concepts:
A Workstream is a logical, sequential grouping of a list of Activities,
also referred to as a workflow.
A Activity is a step within a workflow. In order to better support
the modeling of application processes, various types of control constructs
are provided for conditonals and loopings.
A Job is the external trigger that starts the execution of a workflow,
providing a shared context for all the Activities within a workflow.
Any object that implements the Schedulable interface can be used as
the executor, allowing multiple Workstreams, multiple Activities
to run concurrently.
Add the following dependency to your project.clj file:
[io.czlab/flux "2.0.1"]
(ns demo.app
(:require [czlab.flux.wflow.core :as w]
[czlab.basal.scheduler :as s]))
(def cpu (doto (s/scheduler<> "engine")(.activate {})))
(def wflow1 (w/workstream task1 task2 task3))
(def wflow2 (w/workstream task4 task5 task6))
(->> (jobCreatedFromWeb)
(.execWith wflow1))
(->> (jobCreatedFromIOT)
(.execWith wflow2))
The script Activity is where the user writes code. It is essentially a
wrapper on a function [2-arg] with arg-1 being the current step in the
workflow, and arg-2 being the context of the workflow - the job.
User can get/set values to the job and perform application specific
work. In general the return value of the function should be nil.
However, if the logic decides that a new Activity is to be returned,
then the engine will run this new Activity, implying that the
user can dynamically alter the current workstream.
(ns demo.app
(:require [czlab.flux.wflow.core :as w
czlab.basal.core :as c
czlab.basal.scheduler :as s]))
;global scheduler
(def cpu (doto (s/scheduler<> "test") (.activate nil)))
(let
[ws
(w/workStream<>
(w/group<>
(w/script<> #(println "job data = " %2 " at step 1"))
(w/script<> #(println "job data = " %2 ", at step 2"))))
job (w/job<> cpu ws)]
(.execWith ws job))
(let
[ws
(w/workStream<>
(w/group<>
(w/script<>
(fn [_ job]
(println "job data = " job ", at step 1")
(w/script<>
(fn [_ _]
(c/do->nil
(println "i changed the flow!"))))))
(w/script<> #(println "you won't see me"))))
job (w/job<> cpu ws)]
(.execWith ws job))
The fork Activity can be used to split out a set of Activities which
are then scheduled to run asynchronously in parallel. The workflow can
pause with an and Join until all children have completed their work
and returned, or with an or Join, in which case only one child has to
complete and return, or with an nil Join which tells the engine to
continue without waiting. A timeout can be set to prevent infinite wait,
allowing the user to handle such error condition.
(ns demo.app
(:require [czlab.flux.wflow.core :as w
czlab.basal.core :as c
czlab.basal.scheduler :as s]))
;global scheduler
(def cpu (doto (s/scheduler<> "test") (.activate nil)))
;fork two tasks, wait until they are done, then continue
(let
[ws
(w/workStream<>
(w/fork<> {:join :and}
(w/script<> #(c/do->nil
(c/pause 1000)
(.setv %2 :x 5)))
(w/script<> #(c/do->nil
(c/pause 1500)
(.setv %2 :y 7))))
(w/script<>
#(c/do->nil
(assert (= 12
(+ (.getv %2 :x)
(.getv %2 :y)))))))
job (w/job<> cpu ws)]
(.execWith ws job))
The decision Activity is the standard if-then-else control construct.
(ns demo.app
(:require [czlab.flux.wflow.core :as w
czlab.basal.core :as c
czlab.basal.scheduler :as s]))
;global scheduler
(def cpu (doto (s/scheduler<> "test") (.activate nil)))
(let [ws
(w/workStream<>
(w/decision<>
(fn [job] (= "ok" (.getv job :state)))
(w/script<> #(c/do->nil
(.setv %2 :a 10)))
(w/script<> #(c/do->nil
(.setv %2 :a 5)))))
job (w/job<> cpu ws)]
(.execWith ws job))
The choice Activity is the standard switch control construct.
(ns demo.app
(:require [czlab.flux.wflow.core :as w
czlab.basal.core :as c
czlab.basal.scheduler :as s]))
;global scheduler
(def cpu (doto (s/scheduler<> "test") (.activate nil)))
(let [ws
(w/workStream<>
(w/choice<>
(fn [job] (if (= "ok" (.getv job :reply)) "x" "?"))
(w/script<> #(c/do->nil (.setv %2 :out -1)))
"x" (w/script<> #(c/do->nil (.setv %2 :out 10)))
"y" (w/script<> #(c/do->nil (.setv %2 :out 5)))))
job (w/job<> cpu ws)]
(.execWith ws job))
The floop Activity is the standard for loop control construct. User
can set/configure the looping range, and access the current loop index.
(ns demo.app
(:import [czlab.flux.wflow RangeExpr])
(:require [czlab.flux.wflow.core :as w
czlab.basal.core :as c
czlab.basal.scheduler :as s]))
;global scheduler
(def cpu (doto (s/scheduler<> "test") (.activate nil)))
(let [ws
(w/workStream<>
(w/floop<>
(reify RangeExpr
(lower [_ job] (if (= "ok" (.getv job :x)) 0 4))
(upper [_ job] 10))
(w/script<> #(c/do->nil
(->> (inc (.getv %2 :total))
(.setv %2 :total))))))
job (w/job<> cpu ws)]
(.execWith ws job))
The wloop Activity is the standard while loop control construct.
(ns demo.app
(:require [czlab.flux.wflow.core :as w
czlab.basal.core :as c
czlab.basal.scheduler :as s]))
;global scheduler
(def cpu (doto (s/scheduler<> "test") (.activate nil)))
(let [ws
(w/workStream<>
(w/wloop<>
(fn [job] (< (.getv job :cnt) 10))
(w/script<> #(c/do->nil
(->> (inc (.getv %2 :cnt))
(.setv %2 :cnt))))))
job (w/job<> cpu ws)]
(.setv job :cnt 0)
(.execWith ws job))
The postpone Activity can be used to pause a workflow, rescheduling it
after the time limit has expired.
(ns demo.app
(:require [czlab.flux.wflow.core :as w
czlab.basal.core :as c
czlab.basal.scheduler :as s]))
;global scheduler
(def cpu (doto (s/scheduler<> "test") (.activate nil)))
(let [now (System/currentTimeMillis)
ws
(w/workStream<>
(w/postpone<> 5)
(w/script<> (fn [_ _]
(c/do->nil
(println "boom! something exploded!")))))
job (w/job<> cpu ws)]
(.execWith ws job))
The group Activity is used to sequence a set of Activities.
(ns demo.app
(:require [czlab.flux.wflow.core :as w
czlab.basal.core :as c
czlab.basal.scheduler :as s]))
;global scheduler
(def cpu (doto (s/scheduler<> "test") (.activate nil)))
(let [ws
(w/workStream<>
(w/group<>
(w/script<> (fn [_ _]
(c/do->nil
(println "this is step 1"))))
(w/postpone<> 5)
(w/script<> (fn [_ _]
(c/do->nil
(println "boom! something exploded after 5 secs!"))))))
job (w/job<> cpu ws)]
(.execWith ws job))
Please use the project's GitHub issues page for all questions, ideas, etc. Pull requests welcome. See the project's GitHub contributors page for a list of contributors.
Copyright © 2013-2020 Kenneth Leung
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.
Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |