Workflows are resilient programs that will continue execution even under different failure conditions.
Workflows encapsulate the execution/orchestration of Tasks, including Activities and child Workflows. They must also react to external events, deal with Timeouts, etc.
In this Clojure SDK programming model, a Temporal Workflow is a function declared with defworkflow
(defworkflow my-workflow
[params]
...)
A Workflow implementation consists of defining a (defworkflow) function. The platform invokes this function each time a new Workflow execution is started or retried. Returning from the method signals that the Workflow execution is considered complete. The result is available to the caller via temporal.client.core/get-result.
(require '[temporal.workflow :refer [defworkflow]])
(defworkflow my-workflow
[{:keys [foo]}]
...)
Temporal uses the Event Sourcing pattern to recover the state of a Workflow object, including its threads and local variable values. The Workflow code is re-executed from the beginning whenever a Workflow state requires restoration. During replay, successfully executed Activities are not re-executed but return the result previously recorded in the Workflow event history.
Even though Temporal has the replay capability, which brings resilience to your Workflows, you should never think about this capability when writing your Workflows. Instead, you should focus on implementing your business logic/requirements and write your Workflows as they would execute only once.
There are some things, however, to think about when writing your Workflows, namely determinism and isolation. We summarize these constraints here:
By default, Workflows are automatically registered simply by declaring a (defworkflow). You may optionally manually specify Workflows to register when creating Workers (see temporal.client.worker/worker-options).
It should be noted that the name of the Workflow, the arguments and signals that the Workflow accepts, and the data that the workflow returns are all part of a contract you need to maintain across potentially long-lived instances. Therefore, you should take care when refactoring code involving Workflow logic to avoid inadvertently breaking your contract.
In this Clojure SDK, developers manage Workflows with the following flow:
params passed to these functions will be forwarded to the Workflow and available as args in the request map of the Workflow.(defworkflow my-workflow
[{:keys [foo]}]
...)
(let [w (create-workflow client my-workflow {:task-queue "MyTaskQueue"})]
(start w {:foo "bar"})
@(get-result w))
When starting a workflow, you may encounter a situation where a workflow with the same ID is already running. The :workflow-id-conflict-policy option controls this behavior:
| Policy | Description |
|---|---|
:fail | Fail with an error if workflow is already running (default) |
:use-existing | Return a handle to the existing running workflow instead of starting a new one |
:terminate-existing | Terminate the running workflow and start a new one |
This is particularly useful with signal-with-start and update-with-start, where you want to interact with an existing workflow if running, or start a new one if not:
(let [w (create-workflow client my-workflow {:task-queue "MyTaskQueue"
:workflow-id "my-singleton-workflow"
:workflow-id-conflict-policy :use-existing})]
(signal-with-start w :my-signal {:data "value"} {:initial "args"})
@(get-result w))
Note: This differs from :workflow-id-reuse-policy, which controls whether you can reuse an ID from a completed/failed/terminated workflow. The conflict policy specifically handles currently running workflows.
A Temporal Workflow instance behaves like a Lightweight Process or Fiber. These designs support a high ratio of Workflow instances to CPUs, often in the range of 1000:1 or greater. Achieving this feat requires controlling the IO in and out of the instance to maximize resource sharing. Therefore, any LWP/Fiber implementation will generally provide its own IO constructs (e.g., mailboxes, channels, promises, etc.), and Temporal is no exception.
In this Clojure SDK, this support comes in a few different flavors:
Certain operations naturally return Workflow-safe Promises, such as invoking an Activity from a Workflow. The Clojure SDK integrates these Workflow-safe Promises with the promesa library. This section serves to document their use and limitations.
Instead, you must ensure that all promises originate with an SDK-provided function, such as temporal.activity/invoke or temporal.promise/rejected. For aggregating operations, see Temporal Safe options for temporal.promise/all and temporal.promise/race.
What this means in practice is that any promise chain should generally start with some Temporal-native promise.
Do NOT do this:
(require `[promesa.core :as p])
(-> (p/resolved true)
(p/then (fn [x] (comment "do something with x"))))
Placing (p/resolved) (or anything else that ultimately creates a promesa promise) will not work, and you will receive a run-time error.
The proper method is to ensure that a Temporal native operation starts the chain.
...
(require `[temporal.activity :as a])
(-> (a/invoke some-activity {:some "args"})
(p/then (fn [x] (comment "do something with x"))))
The following situation can lead to a failure:
(-> (when some-condition
(a/invoke some-activity {:some "args"}))
(p/then (fn [x] ...)))
For situations where some-condition is false because promesa will cast the scalar nil to (p/resolved nil), thus violating the origination rule. Instead, do this:
...
(require `[temporal.promise :as tp])
(-> (if some-condition
(a/invoke some-activity {:some "args"})
(tp/resolved false))
(p/then (fn [x] ...)))
Doing so ensures that you meet the origination rules regardless of the conditional outcome.
You may use temporal.workflow/await to efficiently park the Workflow until a provided predicate evaluates to true. The Temporal platform will re-evaluate the predicate at each major state transition of the Workflow.
Your Workflow may send or receive signals.
This SDK provides a core.async inspired abstraction on Temporal Signals called signal-channels. To use signal channels, your Workflow may either block waiting with signals with temporal.signals/<! or use the non-blocking temporal.signals/poll. Either way, your Workflow needs first to obtain the signal-chan context obtained by temporal.signals/create-signal-chan.
(require `[temporal.signals :as s])
(defworkflow my-workflow
[args]
(let [signals (s/create-signal-chan)
message (<! signals "MySignal")]
...))
Alternatively, you may opt to handle signals directly with temporal.signals/register-signal-handler!
(defworkflow my-workflow
[args]
(let [state (atom 0)]
(s/register-signal-handler! (fn [signal-name args]
(swap! state inc)))
(w/await (fn [] (> @state 1)))
@state))
Note: The operations above are for receiving signals within a workflow. To send signals from outside a workflow (from a client), use temporal.client.core/>!. To send signals to other workflows from within a workflow, see External Workflows.
Your Workflow may respond to queries.
A Temporal query is similar to a Temporal signal; both are messages sent to a running Workflow. The difference is that a signal intends to change the behaviour of the Workflow, whereas a query intends to inspect its current state. Querying the state of a Workflow implies that the Workflow must maintain its state while running, typically in a Clojure atom or ref.
To enable querying a Workflow, you may use temporal.workflow/register-query-handler!. The query handler is a function that has a reference to the Workflow state, usually by closing over it. It interprets the query and returns a response.
(defworkflow stateful-workflow
[{:keys [init] :as args}]
(let [state (atom init)]
(register-query-handler! (fn [query-type args]
(when (= query-type :my-query)
(get-in @state [:path :to :answer]))))
;; e.g. react to signals (perhaps in a loop), updating the state atom
))
You may query a Workflow with temporal.client.core/query.
A query consists of a query-type (keyword) and possibly some args (any serializable data structure).
(query workflow :my-query {:foo "bar"})
Updates are similar to signals and queries but combine features of both: like signals, they can mutate workflow state; like queries, they return values to the caller. Additionally, updates can perform blocking operations such as invoking activities or child workflows.
To enable updates on a Workflow, use temporal.workflow/register-update-handler!. The update handler is a function that has a reference to the Workflow state, typically by closing over it. It processes the update, optionally mutates state, and returns a response.
(defworkflow stateful-workflow
[{:keys [init] :as args}]
(let [state (atom init)]
(register-update-handler!
(fn [update-type args]
(when (= update-type :increment)
(swap! state update :counter + (:amount args))
@state)))
;; workflow implementation
))
You may optionally provide a validator function that runs before the update handler. Validators must not mutate state or perform blocking operations:
(register-update-handler!
(fn [update-type args]
(swap! state update :counter + (:amount args))
@state)
{:validator (fn [update-type args]
(when (neg? (:amount args))
(throw (IllegalArgumentException. "amount must be positive"))))})
You may send an update to a Workflow with temporal.client.core/update.
An update consists of an update-type (keyword) and possibly some args (any serializable data structure).
(update workflow :increment {:amount 5})
For cases where you want to send an update without blocking until completion, use temporal.client.core/start-update. This returns a handle that you can use to retrieve the result later:
(let [handle (c/start-update workflow :increment {:amount 5})]
;; Do other work while update is processing...
(println "Update ID:" (:id handle))
;; When ready, wait for the result
(println "Result:" @(:result handle)))
The handle contains:
:id - The unique identifier for this update:execution - The workflow execution info:result - A promise that resolves to the update resultYou can customize the behavior with options:
(c/start-update workflow :increment {:amount 5}
{:update-id "my-idempotency-key"
:wait-for-stage :completed}) ;; :admitted, :accepted, or :completed
To retrieve a handle for a previously initiated update, use temporal.client.core/get-update-handle:
(let [handle (c/get-update-handle workflow "my-update-id")]
@(:result handle))
To atomically start a workflow (if not running) and send an update, use temporal.client.core/update-with-start. This is useful for ensuring a workflow exists before sending an update:
(let [workflow (c/create-workflow client my-workflow
{:task-queue task-queue
:workflow-id "my-workflow-id"
:workflow-id-conflict-policy :use-existing})
handle (c/update-with-start workflow :increment {:amount 5} {:initial-value 0})]
@(:result handle))
Important: update-with-start requires the :workflow-id-conflict-policy option to be set in the workflow options (see Workflow ID Conflict Policy).
Temporal workflows execute on a single thread, so you don't need to worry about parallelism. However, you do need to consider concurrency when signal or update handlers can block (call activities, sleep, await, or child workflows).
Non-blocking handlers run to completion atomically. No lock needed:
;; Non-blocking handler - runs atomically, no lock needed
(fn [update-type {:keys [amount]}]
(swap! state update :counter + amount)
@state)
Blocking handlers can be interrupted at await points, allowing other handlers to interleave. Use temporal.workflow/new-lock and temporal.workflow/with-lock when a handler performs blocking operations AND accesses shared state:
;; PROBLEM: Handler could be interrupted at the activity call
(fn [update-type args]
(let [current (:counter @state)] ;; read state
(let [result @(a/invoke validate-activity current)] ;; BLOCKS - other handlers can run!
(swap! state assoc :validated result)))) ;; another handler may have changed state
Use a lock to prevent other handlers from interleaving:
(defworkflow my-workflow
[args]
(let [state (atom {})
lock (w/new-lock)]
(w/register-update-handler!
(fn [update-type args]
(w/with-lock lock
;; Lock ensures no other handler runs during this sequence
(let [current (:counter @state)]
(let [result @(a/invoke validate-activity current)]
(swap! state assoc :validated result)
@state)))))
;; workflow implementation
))
When handlers perform blocking operations (activities, child workflows, sleep, await), your workflow could complete before handlers finish their work. This can cause interrupted handler execution, client errors, or lost work.
Use temporal.workflow/every-handler-finished? with temporal.workflow/await to wait for all handlers to complete:
(defworkflow my-workflow
[args]
(let [state (atom {:counter 0})]
(w/register-update-handler!
(fn [update-type args]
;; Handler that does async work
(let [result @(a/invoke process-activity args)]
(swap! state assoc :processed result)
@state)))
;; Do main workflow logic...
(do-work state)
;; Wait for any in-progress handlers before completing
(w/await w/every-handler-finished?)
@state))
By default, your worker will log a warning when a workflow completes with unfinished handlers.
External workflows allow you to interact with independently running workflows from within a workflow context. Unlike child workflows (which are started and managed by a parent workflow), external workflows are started independently and you simply obtain a handle to communicate with them.
Use external workflows when you need to:
Note: External workflow stubs do NOT support query, update, or getResult operations—those are client-only operations. External workflows support only signaling and cancellation.
Use temporal.workflow/external-workflow to create a handle to an external workflow:
(require '[temporal.workflow :as w])
;; Create handle using just workflow ID (targets latest run)
(let [handle (w/external-workflow "other-workflow-id")]
...)
;; Or specify a specific run ID
(let [handle (w/external-workflow "other-workflow-id" "specific-run-id")]
...)
Use temporal.workflow/signal-external to send signals to an external workflow:
(defworkflow coordinator-workflow
[{:keys [worker-ids]}]
(doseq [worker-id worker-ids]
(let [worker (w/external-workflow worker-id)]
(w/signal-external worker :new-task {:task-id (random-uuid)}))))
Use temporal.workflow/cancel-external to request cancellation of an external workflow:
(defworkflow supervisor-workflow
[{:keys [worker-id]}]
(let [worker (w/external-workflow worker-id)]
;; Request cancellation
(w/cancel-external worker)))
The external workflow will receive a CancellationException and can handle it appropriately.
Use temporal.workflow/get-execution to retrieve the workflow ID and run ID:
(let [handle (w/external-workflow "some-workflow-id")
execution (w/get-execution handle)]
(log/info "Workflow:" (:workflow-id execution) "Run:" (:run-id execution)))
This SDK integrates with the slingshot library. Stones cast with Slingshot's throw+ are serialized and re-thrown across activity and workflow boundaries in a transparent manner that is compatible with Slingshot's try+ based catch blocks.
By default, stones cast that are not caught locally by an Activity or Workflow trigger ApplicationFailure semantics and are thus subject to the overall Retry Policies in place. However, the developer may force a given stone to be non-retriable by setting the flag '::non-retriable?' within the object.
Example:
(require `[temporal.exceptions :as e])
(require `[slingshot.slingshot :refer [throw+]])
(defactivity my-activity
[ctx args]
(throw+ {:type ::my-fatal-error :msg "this error is non-retriable" ::e/non-retriable? true}))
The Temporal Platform requires that Workflow code be deterministic. Because of that requirement, this Clojure SDK exposes a workflow patching API temporal.workflow/get-version. Workflow developers use the get-version function to determine the correct branch of logic to follow to maintain determinism.
Example:
Assume we have a workflow that invokes an activity using temporal.activity/invoke that we wish to convert to temporal.activity/local-invoke. Such a change is acceptable for any future instances of your Workflow. However, any existing instances must be careful as this logic change could introduce non-determinism during replay.
We can safely handle both the original and the new desired scenario by branching based on the results from calling temporal.workflow/get-version:
(require `[temporal.workflow :as w])
(require `[temporal.activity :as a])
(let [version (w/get-version ::local-activity w/default-version 1)]
(cond
(= version w/default-version) @(a/invoke versioned-activity :v1)
(= version 1) @(a/local-invoke versioned-activity :v2)))
Long-running Workflows can accumulate large event histories. To prevent unbounded growth, you can use temporal.workflow/continue-as-new to complete the current Workflow execution and immediately start a new execution with the same Workflow ID but a fresh event history.
This is particularly useful for Workflows that:
(require '[temporal.workflow :as w])
(defworkflow batch-processor
[{:keys [batch-number processed-count]}]
(let [items (fetch-next-batch)
new-count (+ processed-count (count items))]
(process-items items)
(if (more-batches?)
(w/continue-as-new {:batch-number (inc batch-number)
:processed-count new-count})
{:total-processed new-count})))
Continue-as-new accepts an optional options map to override settings for the new run:
(w/continue-as-new params {:task-queue "different-queue"
:workflow-run-timeout (Duration/ofHours 1)})
Search attributes are indexed metadata that can be used to filter and search for Workflows in the Temporal UI and via the Visibility API.
You can set search attributes when starting a Workflow:
(c/create-workflow client my-workflow {:task-queue task-queue
:search-attributes {"CustomerId" "12345"}})
To update search attributes during Workflow execution, use temporal.workflow/upsert-search-attributes:
(require '[temporal.workflow :as w])
(defworkflow order-workflow
[{:keys [order-id]}]
(w/upsert-search-attributes {"OrderStatus" {:type :keyword :value "processing"}})
;; ... process order ...
(w/upsert-search-attributes {"OrderStatus" {:type :keyword :value "completed"}}))
Supported search attribute types:
:text - Full-text searchable string:keyword - Exact-match string:int - 64-bit integer:double - 64-bit floating point:bool - Boolean:datetime - OffsetDateTime:keyword-list - List of exact-match stringsNote: Custom search attributes must be pre-registered with the Temporal server before use.
Can you improve this documentation? These fine people already did:
Greg Haskins, Thomas Moerman & Gregory HaskinsEdit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |