Effects are the mechanism through which spins interact with the outside world — other spins, signals, and async sequences. The spin macro recognizes effect calls as CPS breakpoints, transforming them into non-blocking suspension points.
await — Spin-to-Spin Dependencyawait suspends the current spin until a child spin, deferred, or mailbox produces a value. It also registers a dependency so the parent re-executes when the child changes.
(require '[org.replikativ.spindel.effects.await :refer [await]])
(spin
(let [x (await child-spin)
y (await other-spin)]
(+ x y)))
Supported types:
| Type | Behavior |
|---|---|
| Spin | Awaits completion, tracks dependency. Fast path if already cached. |
| Deferred | Suspends until value delivered via deliver! |
| Mailbox | Suspends until message posted via post! |
| SignalRef | Error — use track instead |
Fast path: If the awaited spin is already completed and cached, await returns the cached value immediately without suspending.
Reactive re-execution: When an awaited child spin changes (e.g., because one of its tracked signals changed), the parent is marked dirty and re-executes on next deref.
Cancellation check: await checks if the current spin has been cancelled. If so, it throws ::spin-cancelled instead of proceeding.
track — Signal-to-Spin Dependencytrack reads a signal's current value and registers a reactive dependency. The spin re-executes when the signal changes.
(require '[org.replikativ.spindel.effects.track :refer [track]])
(spin
(let [{:keys [new old deltas]} (track my-signal)]
;; new: current value
;; old: value at previous execution
;; deltas: structural changes (for deltaable collections)
(process new)))
Return value: An Interval with three perspectives:
;; Map destructuring
(let [{:keys [new old deltas]} (track sig)] ...)
;; Sequential destructuring
(let [[new-val old-val deltas] (track sig)] ...)
;; Just the current value
(let [val @(track sig)] ...)
| Field | Description |
|---|---|
:new | Current signal value |
:old | Value at previous spin execution (nil on first run) |
:deltas | Structural changes since last run (for deltaable collections) |
Delta shape depends on what you're tracking:
If the signal contains a deltaable collection (d/deltaable-vector,
-map, -set), :deltas is the raw per-mutation vocabulary —
the input edge of the pipeline:
[{:delta :add :path [idx-or-key] :value v}
{:delta :update :path [idx-or-key] :value v :old-value old-v}
{:delta :remove :path [idx-or-key] :value v}]
If you tracked a typed-combinator interval (e.g. an imap/ifilter
result), :deltas is the combinator's algebra-specific record — a
SequenceAlgebra {:degree :grow :shrink :permutation :change :freeze},
a MapAlgebra {:assoc :dissoc :update}, or a ScalarAlgebra
::no-change | [::replace v].
nil means "I don't know what changed — recompute from :new".
See incremental.md for the full typed delta algebra
and the 3-state :deltas contract.
Cancellation check: Like await, track checks cancellation before proceeding.
yield — Async Sequence Generationyield emits a value in an async sequence generator. Only usable inside gen-aseq.
(require '[org.replikativ.spindel.seq.core :refer [gen-aseq yield]])
(def numbers
(gen-aseq
(yield 1)
(yield 2)
(yield 3)))
Each yield suspends the generator until the consumer requests the next value via anext. See async sequences in Getting Started for consumption patterns.
yield outside gen-aseq: Throws an error. The yield handler is only bound inside gen-aseq bodies.
gen-aseqGenerate a lazy async sequence:
(def countdown
(gen-aseq
(loop [n 5]
(when (pos? n)
(yield n)
(recur (dec n))))))
Cold semantics: Each consumer (via anext) gets independent execution. Multiple consumers see independent sequences.
Spin integration: You can await spins inside gen-aseq:
(def processed
(gen-aseq
(loop [n 0]
(when (< n 3)
(let [result (await (fetch-data n))]
(yield (* 2 result))
(recur (inc n)))))))
for — Async Sequence ComprehensionLike clojure.core/for but for async sequences, with spindel effect support:
(require '[org.replikativ.spindel.seq.core :as seq-core])
(spin
(let [aseq (seq-core/for [x [1 2 3]
:when (odd? x)]
(await (async-double x)))]
;; Consume with anext
(loop [s aseq acc []]
(if-let [[v rest] (await (seq-core/anext s))]
(recur rest (conj acc v))
acc))))
;; => [2 6]
Supports all for modifiers: :let, :when, :while, and multiple bindings.
Use anext to consume one element at a time:
(require '[org.replikativ.spindel.seq.core :as seq-core])
(spin
;; anext returns [value rest-seq] or nil (end of sequence)
(let [[v1 rest1] (await (seq-core/anext my-seq))
[v2 rest2] (await (seq-core/anext rest1))]
[v1 v2]))
Collect into a vector:
(spin
(await (seq-core/into [] my-seq)))
A deferred is a single-assignment value that multiple spins can await:
(require '[org.replikativ.spindel.spin.sync :refer [deferred deliver!]])
(def d (deferred))
;; In a spin — suspends until delivered
(spin
(let [value (await d)]
(process value)))
;; From external code (future, callback, etc.)
(deliver! d 42)
deliver! succeedsInternal vs external delivery:
(d value) — deliver from within the same execution context (internal)(deliver! d value) — deliver from external threads (enqueues event, safe)A mailbox is a FIFO queue for message passing between spins:
(require '[org.replikativ.spindel.spin.sync :refer [mailbox post!]])
(def mbx (mailbox))
;; Consumer spin — blocks until message available
(spin
(loop []
(let [msg (await mbx)]
(process msg)
(recur))))
;; Producer — post messages
(post! mbx {:type :task :data "work"})
Waiter struct (internal, relevant if you extend mailbox-like
primitives): each pending await lives in state-atom.:waiters as
{:spin-id … :cancel-token … :resolve …}. post-inline! reads
:engine/cancelled-tokens from the current execution context once
per call and skips waiters whose :cancel-token is in the cancelled
set — without consuming the message (it recurs onto the next
waiter, or pushes the message back to :queue if no live waiter
remains). This prevents the "orphaned waiter silently absorbs a
producer's post" message-loss bug when a body's await is truncated
by a track-resume. See Cont-level cancellation gate for the gate machinery; the
full design is documented in source-comments at
src/org/replikativ/spindel/effects/await.cljc
(search for cancellable-external-pair).
A spin that never completes, useful with race and timeout:
(require '[org.replikativ.spindel.spin.sync :refer [never]])
;; Wait for either result or cancellation
(spin
(await (race
(do-work)
(never)))) ;; race cancels never when do-work completes
You can register custom effects that the spin macro recognizes as CPS breakpoints.
(require '[org.replikativ.spindel.engine.effects :as effects])
;; Register a synchronous effect
(effects/register-effect-by-symbol!
'my.ns/my-effect ;; fully-qualified symbol
(reify effects/PEffectHandler
(handle-effect [_ context args resolve reject]
(try
(let [result (do-something (:value args))]
(resolve result))
(catch Exception e
(reject e)))))
'my.ns/my-adapter) ;; adapter: (fn [args-vec] args-map)
The adapter function converts the positional arguments from the call site into a map:
(defn my-adapter [args]
{:value (first args)})
Once registered, use the effect inside spins:
(spin
(let [result (my.ns/my-effect some-value)]
(process result)))
The spin macro detects my.ns/my-effect as a registered breakpoint and transforms it into a CPS call to your handler.
For performance, you can register a direct handler — a function called directly instead of going through protocol dispatch:
(effects/register-effect-by-symbol!
'my.ns/my-effect
:direct ;; marker, not PEffectHandler
'my.ns/my-adapter
'my.ns/my-direct-handler) ;; (fn [value spin-id loc resolve reject] ...)
Direct handlers receive arguments already adapted, plus spin context. The built-in await and track use direct handlers for performance.
await's cancellation check handles whole-spin cancellation
(the entire body bails with ::spin-cancelled). There is a second,
finer-grained mechanism for cont-level cancellation — relevant only
when you write custom effect handlers that hand a raw resolve closure
to an external resource (Deferred's :pending list, Mailbox waiter
struct, plain-fn awaitable callbacks).
The problem: when a parent body's earlier track continuation
re-resumes (because a tracked signal changed), the engine truncates
later conts — including the engine-side await cont. But the external
resource still holds the raw resolve closure in its pending list /
waiter struct. If the resource later fires, both the orphaned closure
and the new closure (registered by the parent's re-run) advance their
respective body slices to outer-resolve. Pure bodies waste work;
side-effecting bodies fire those side effects twice.
The fix — built into the standard
effects/await.cljc::cancellable-external-pair and used automatically
by await-deferred, await-mailbox, and the plain-fn awaitable path —
wraps every raw resolve / reject in a cancellation gate:
cancel-token (UUID).(ec/current-execution-context) at
call time and gates on whether the token is in that context's
:engine/cancelled-tokens set. If yes, the closure no-ops.:cancel!, which writes the
token to whatever execution context invokes it.You only interact with this if you're building a new external awaitable. The two helpers to know:
effects/await.cljc::cancellable-external-pair returns
[wrapped-resolve wrapped-reject cancel-token] — call this with
the parent's resolve/reject and pass the wrapped pair to your
resource.cancel-token through engine.core/*external-await-cancel-token*
into the waiter struct, and have your consumer skip cancelled
waiters BEFORE consuming the event (see Mailbox above).Deferred and plain-fn awaitables don't need the third token-threading
step because they deliver to all pending closures (no consumption).
engine.core/*external-await-cancel-token* is bound to nil for
them.
The fork-safety reasoning (why the cancellation set lives in engine
state at :engine/cancelled-tokens rather than a closure-captured
volatile) is in the source comments at
src/org/replikativ/spindel/effects/await.cljc —
cancellable-external-pair carries the full design as a block
comment above the function.
parallel, race, timeout, and moretrack resultsCan you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |