Workflows are resilient programs that will continue execution even under different failure conditions.
Workflows encapsulate the execution/orchestration of Tasks, including Activities and child Workflows. They must also react to external events, deal with Timeouts, etc.
In this Clojure SDK programming model, a Temporal Workflow is a function declared with defworkflow
(defworkflow my-workflow
[params]
...)
A Workflow implementation consists of defining a (defworkflow) function. The platform invokes this function each time a new Workflow execution is started or retried. Returning from the method signals that the Workflow execution is considered complete. The result is available to the caller via temporal.client.core/get-result.
(require '[temporal.workflow :refer [defworkflow]])
(defworkflow my-workflow
[{:keys [foo]}]
...)
Temporal uses the Event Sourcing pattern to recover the state of a Workflow object, including its threads and local variable values. In essence, the Workflow code is re-executed from the beginning whenever a Workflow state requires restoration. During replay, successfully executed Activities are not re-executed but return the result previously recorded in the Workflow event history.
Even though Temporal has the replay capability, which brings resilience to your Workflows, you should never think about this capability when writing your Workflows. Instead, you should focus on implementing your business logic/requirements and write your Workflows as they would execute only once.
There are some things, however, to think about when writing your Workflows, namely determinism and isolation. We summarize these constraints here:
By default, Workflows are automatically registered simply by declaring a (defworkflow). You may optionally manually specify Workflows to register when creating Workers (see temporal.client.worker/worker-options).
It should be noted that the name of the Workflow, the arguments and signals that the Workflow accepts, and the data that the workflow returns are all part of a contract that you need to maintain across potentially long-lived instances. Therefore, refactoring code involving Workflow logic should be treated with care to avoid inadvertently breaking your contract.
In this Clojure SDK, developers manage Workflows with the following flow:
params
passed to these functions will be forwarded to the Workflow and available as args
in the request map of the Workflow.(defworkflow my-workflow
[{:keys [foo]}]
...)
(let [w (create-workflow client my-workflow {:task-queue "MyTaskQueue"})]
(start w {:foo "bar"})
@(get-result w))
A Temporal Workflow instance behaves like a Lightweight Process or Fiber. These types of designs support a high ratio of Workflow instances to CPUs, often in the range of 1000:1 or greater. Achieving this feat requires controlling the IO in and out of the instance to maximize resource sharing. Therefore, any LWP/Fiber implementation will generally provide its own IO constructs (e.g., mailboxes, channels, promises, etc.), and Temporal is no exception.
In this Clojure SDK, this support comes in a few different flavors:
Specific methods naturally return Workflow-safe Promises, such as invoking an Activity from a Workflow. The Clojure SDK integrates these Workflow-safe Promises with the promesa library. This section serves to document their use and limitations.
Instead, you must ensure that all promises originate with an SDK provided function, such as temporal.activity/invoke or temporal.promise/rejected. For aggregating operations, see Temporal Safe options for temporal.promise/all and temporal.promise/race.
What this means in practice is that any promise chain should generally start with some Temporal-native promise.
Do NOT do this:
(require `[promesa.core :as p])
(-> (p/resolved true)
(p/then (fn [x] (comment "do something with x"))))
Placing (p/resolved) (or anything else that ultimately creates a promesa promise) will not work, and you will receive a run-time error.
The proper method is to ensure that a Temporal native operation starts the chain.
...
(require `[temporal.activity :as a])
(-> (a/invoke some-activity {:some "args"})
(p/then (fn [x] (comment "do something with x"))))
The following situation can lead to a failure:
(-> (when some-condition
(a/invoke some-activity {:some "args"}))
(p/then (fn [x] ...)))
For situations where some-condition
is false
because promesa will cast the scalar nil
to (p/resolved nil), thus violating the origination rule. Instead, do this:
...
(require `[temporal.promise :as tp])
(-> (if some-condition
(a/invoke some-activity {:some "args"})
(tp/resolved false))
(p/then (fn [x] ...)))
Doing so ensures that the origination rules are met regardless of the outcome of the conditional.
You may use temporal.workflow/await to efficiently park the Workflow until a provided predicate evaluates to true. The Temporal platform will re-evaluate the predicate at each major state transition of the Workflow.
Your Workflow may send or receive signals.
This SDK provides a core.async inspired abstraction on Temporal Signals called signal-channels. To use signal channels, your Workflow may either block waiting with signals with temporal.signals/<! or use the non-blocking temporal.signals/poll. Either way, your Workflow needs to first obtain the signal-chan
context obtained by temporal.signals/create-signal-chan.
(require `[temporal.signals :as s])
(defworkflow my-workflow
[args]
(let [signals (s/create-signal-chan)
message (<! signals "MySignal")]
...))
Alternatively, you may opt to handle signals directly with temporal.signals/register-signal-handler!
(defworkflow my-workflow
[args]
(let [state (atom 0)]
(s/register-signal-handler! (fn [signal-name args]
(swap! state inc)))
(w/await (fn [] (> @state 1)))
@state))
Your Workflow may respond to queries.
A temporal query is similar to a temporal signal, both are messages sent to a running Workflow. The difference is that a signal intends to change the behaviour of the Workflow, whereas a query intends to inspect the current state of the Workflow. Querying the state of a Workflow implies that the Workflow must maintain state while running, typically in a clojure atom or ref.
To enable querying a Workflow, you may use temporal.workflow/register-query-handler!. The query handler is a function that has a reference to the Workflow state, usually by closing over it. It interprets the query and returns a response.
(defworkflow stateful-workflow
[{:keys [init] :as args}]
(let [state (atom init)]
(register-query-handler! (fn [query-type args]
(when (= query-type :my-query)
(get-in @state [:path :to :answer]))))
;; e.g. react to signals (perhaps in a loop), updating the state atom
))
You may query a Workflow with temporal.client.core/query.
A query consists of a query-type
(keyword) and possibly some args
(any serializable data structure).
(query workflow :my-query {:foo "bar"})
This SDK integrates with the slingshot library. Stones cast with slingshot's throw+ are serialized and re-thrown across activity and workflow boundaries in a transparent manner that is compatible with slingshot idiomatic try+ based catch blocks.
By default, stones cast that are not caught locally by an activity or workflow trigger ApplicationFailure semantics and are thus subject to the overall Retry Policies in place. However, the developer may force a given stone to be non-retriable by setting the flag '::non-retriable?' within the object.
Example:
(require `[temporal.exceptions :as e])
(require `[slingshot.slingshot :refer [throw+]])
(defactivity my-activity
[ctx args]
(throw+ {:type ::my-fatal-error :msg "this error is non-retriable" ::e/non-retriable? true}))
The Temporal Platform requires that Workflow code be deterministic. Because of that requirement, this Clojure SDK exposes a workflow patching API temporal.workflow/get-version. Workflow developers use the get-version
function to determine the correct branch of logic to follow to maintain determinism.
Example:
Assume we have a workflow that invokes an activity using temporal.activity/invoke that we wish to convert to temporal.activity/local-invoke. Changing this for future workflows is not a problem. However, any existing workflows need to be careful as this change could introduce non-determinism.
We can safely handle both the original and the new desired scenario by branching based on the results from calling temporal.workflow/get-version:
(require `[temporal.workflow :as w])
(require `[temporal.activity :as a])
(let [version (w/get-version ::local-activity w/default-version 1)]
(cond
(= version w/default-version) @(a/invoke versioned-activity :v1)
(= version 1) @(a/local-invoke versioned-activity :v2)))
Can you improve this documentation? These fine people already did:
Greg Haskins & Thomas MoermanEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close