Activities are implementations of specific tasks performed during a Workflow execution. These tasks typically interact with external systems, such as databases, services, etc. They are free to block using conventional synchronization primitives or to run complex operations. An Activity forms the basis of a checkpoint, because the values entering and leaving an Activity become part of the Event History.
Workflows orchestrate invocations of Activities.
An Activity implementation consists of defining a (defactivity) function. The platform invokes this function each time an Activity execution is started by a Workflow or retried by the platform. Returning from the method signals that the Activity is considered complete. The result is then made available to the calling Workflow.
(require '[temporal.activity :refer [defactivity] :as a])
(defactivity greet-activity
[ctx {:keys [name] :as args}]
(str "Hi, " name))
By default, Activities are automatically registered simply by declaring a (defactivity). You may optionally manually specify Activities to register when creating Workers (see temporal.client.worker/worker-options).
It should be noted that the name of the Activity, the arguments that the Activity accepts, and the data that the Activity returns are all part of a contract you need to maintain across potentially long-lived instances. Therefore, the Activity definition must be treated with care whenever refactoring code.
In this Clojure SDK, Workflows start Activities with either temporal.activity/invoke or temporal.activity/local-invoke, both of which share similar calling conventions. The primary difference between them is the execution model under the covers (See What is a Local Activity)
(require '[temporal.activity :refer [defactivity] :as a])
(defactivity my-activity
[ctx {:keys [foo] :as args}]
...)
(a/invoke my-activity {:foo "bar"})
Within an activity implementation, you can inspect runtime information about the current execution via temporal.activity/get-info.
| Key | Description |
|---|---|
:task-token | Opaque task token identifying this execution attempt |
:activity-id | Unique ID for this activity instance |
:activity-type | The registered name of the activity type |
:in-workflow? | true when running inside a workflow, false for standalone activities (Temporal Java SDK 1.35+) |
(require '[temporal.activity :refer [defactivity] :as a])
(defactivity my-activity
[_ _]
(let [info (a/get-info)]
(log/info "Activity ID:" (:activity-id info))
...))
The :workflow-id and :run-id fields are only populated when the activity runs as part of a workflow execution. When running as a standalone activity (invoked directly without a workflow, a feature introduced in Temporal Java SDK 1.35), these fields are nil.
Use temporal.activity/in-workflow? before accessing these fields to avoid null-pointer errors:
(defactivity my-activity
[_ _]
(when (a/in-workflow?)
(let [{:keys [workflow-id run-id]} (a/get-info)]
(log/info "Parent workflow:" workflow-id "run:" run-id))))
temporal.activity vs temporal.client.activity| Namespace | Usage |
|---|---|
temporal.activity | Activities executed by workers as part of workflow orchestration (current) |
temporal.client.activity | Direct standalone activity invocation outside a workflow context (Temporal Java SDK 1.35+) |
Activities defined with defactivity in temporal.activity work in both contexts. The difference is in how they are invoked: via a/invoke inside a workflow, or via temporal.client.activity for standalone execution. In standalone context, :in-workflow? returns false and workflow-scoped fields are nil.
The traditional way to detect activity cancellation is to call heartbeat and catch ActivityCanceledException.
Temporal Java SDK 1.36 introduced get-cancellation-token, which provides cancellation detection without requiring a heartbeat. This is useful for activities that do not otherwise heartbeat.
Note: Cancellation delivery via this API requires a recent Temporal server version.
Use the native helper functions to interact with the token without Java interop:
Use cancellation-requested? to check cancellation state without blocking:
(require '[temporal.activity :refer [defactivity] :as a])
(defactivity my-activity
[_ _]
(let [token (a/get-cancellation-token)]
(loop []
(Thread/sleep 100)
(if (a/cancellation-requested? token)
:cancelled
(recur)))))
Use throw-if-cancelled! as a lightweight checkpoint; it throws ActivityCanceledException if cancelled, no-ops otherwise:
(defactivity my-activity
[_ _]
(let [token (a/get-cancellation-token)]
(dotimes [_ 10]
(a/throw-if-cancelled! token)
(Thread/sleep 500))
:completed))
Use cancellation-future to obtain a promesa-compatible promise that completes when cancellation is requested:
(defactivity my-activity
[_ _]
(let [token (a/get-cancellation-token)]
@(a/cancellation-future token) ;; blocks until cancelled
:cancelled))
Returning a core.async channel places the Activity into Asynchronous mode. The Activity is then free to send a single message on the channel at a future time to signal completion. The value sent is returned to the calling Workflow. Sending a Throwable will signal a failure of the Activity.
(require '[temporal.activity :refer [defactivity] :as a])
(require '[clojure.core.async :refer [go <!] :as async])
(defactivity async-greet-activity
[ctx {:keys [name] :as args}]
(go
(<! (async/timeout 1000))
(str "Hi, " name)))
Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |