Spindel provides combinators for composing spins with concurrency, timing, and error handling.
(require '[org.replikativ.spindel.spin.combinators
:refer [parallel race sleep timeout debounce throttle
sample relieve accumulate]]
'[org.replikativ.spindel.spin.core :refer [ok error attempt absolve]]
'[org.replikativ.spindel.incremental.interval :as iv])
parallel — Concurrent ExecutionExecute multiple spins concurrently. Returns a vector of results when all complete.
(spin
(let [[a b c] (await (parallel
(fetch-user user-id)
(fetch-posts user-id)
(fetch-settings user-id)))]
{:user a :posts b :settings c}))
parallel updatesrace — First to CompleteExecute multiple spins concurrently. Returns the result of the first to complete; losers are cancelled.
(spin
(await (race
(fetch-from-primary)
(fetch-from-replica))))
await pointsleep — Time DelayCreate a spin that completes after a duration (milliseconds).
(spin
(await (sleep 1000)) ;; wait 1 second, returns nil
(await (sleep 500 :done))) ;; wait 500ms, returns :done
timeout — Deadline with FallbackRace a spin against a deadline. Returns fallback value if the spin doesn't complete in time.
(spin
(let [data (await (timeout (fetch-remote-data) 5000 :cached-default))]
(process data)))
Equivalent to (race source-spin (sleep timeout-ms fallback-value)).
These combinators control how frequently reactive re-executions deliver results.
debounce — Wait for Quiet PeriodDelay delivery until a quiet period elapses. Useful for text input — wait until the user stops typing.
(spin
(let [content (await (debounce (track content-signal) 300))]
(search-preview content)))
When the signal changes, the timer restarts. The spin only receives the value after 300ms of no changes.
throttle — Max FrequencyLimit updates to a maximum frequency (Hz). A merge function combines intermediate values.
;; Limit to 60 updates/second, keep latest value
(spin
(let [pos (await (throttle (track mouse-signal) 60 (fn [_ new] new)))]
(update-cursor pos)))
;; Collect all values between deliveries
(spin
(let [events (await (throttle (track event-signal) 10
(fn [acc new] (conj (or acc []) new))))]
(process-batch events)))
The merge function signature: (fn [accumulated-value new-value] -> merged-value)
sample — Fixed Interval PollingTake the latest value at fixed intervals, ignoring intermediate changes.
;; Sync state to server every 5 seconds
(spin
(let [state (await (sample (track app-state) 5000))]
(persist-to-server! state)))
relieve — Drop Intermediate ValuesWhen the observer is slower than the producer, drop intermediate values and always deliver the latest.
;; Real-time display where freshness matters more than completeness
(spin
(let [data (await (relieve (track sensor-signal)))]
(render-dashboard data)))
accumulate — Preserve DeltasWhen used with throttle, intermediate deltas are normally lost. accumulate preserves all deltas by merging intervals:
;; Without accumulate — intermediate deltas may be lost
(spin
(let [items (await (throttle (track items-signal) 10 (fn [_ new] new)))]
;; deltas may be incomplete
))
;; With accumulate — all deltas preserved
(spin
(let [iv (await (throttle
(accumulate items-signal iv/merge-intervals)
10
(fn [_ new] new)))]
;; iv contains ALL deltas since last delivery. The raw
;; `:add`/`:remove`/`:update` walk below works on
;; deltaable-collection signals (the input edge); typed
;; combinators output algebra records — pipe `iv` into
;; `ifor-each` / `imap` / `ifilter` rather than walking
;; deltas by hand for any DOM-bound consumer.
(doseq [{:keys [delta path value]} (iv/get-deltas iv)]
(case delta
:add (render-item-at path value)
:remove (remove-item-at path)
:update (update-item-at path value)))))
merge-intervals is associative (CRDT-like): merge(merge(a,b),c) = merge(a,merge(b,c)). It preserves the original baseline (:old), uses the latest value (:new), and concatenates + compacts deltas.
Spins produce Result values — either success or error:
(require '[org.replikativ.spindel.spin.core :refer [ok error ok? error? unwrap match]])
;; Create results
(ok 42) ;; success
(error (ex-info "oops" {})) ;; failure
;; Check results
(ok? (ok 42)) ;; => true
(error? (ok 42)) ;; => false
;; Unwrap (throws on error)
(unwrap (ok 42)) ;; => 42
(unwrap (error (ex-info "oops" {}))) ;; throws!
;; Pattern match
(match result
(fn [value] (println "Success:" value))
(fn [err] (println "Error:" err)))
attempt — Capture ErrorsWrap a spin's result so errors don't propagate. The result becomes a zero-argument function that either returns the value or throws:
(spin
(let [result-fn (await (attempt risky-spin))]
(try
(result-fn) ;; returns value or throws
(catch Exception e
:fallback))))
absolve — Unwrap Captured ErrorsThe inverse of attempt — calls the wrapped function, converting captured errors back to thrown exceptions:
(spin
(let [value (await (absolve safe-spin))]
;; If safe-spin returned an error-wrapping function, it throws here
(process value)))
Spins support cooperative cancellation:
(require '[org.replikativ.spindel.spin.core :refer [cancel-spin! spin-cancelled?]])
;; Cancel a spin (and all its observers)
(cancel-spin! my-spin)
;; Check cancellation
(spin-cancelled? my-spin) ;; => true
Cancellation is cooperative — spins check at await and track points. CPU-bound code without breakpoints won't be interrupted.
Spins are automatically cleaned up when garbage collected. For explicit cleanup:
(require '[org.replikativ.spindel.spin.core :refer [cleanup-spin!]])
;; Remove spin from dependency graph and signal observer lists
(cleanup-spin! my-spin)
;; Don't deref after cleanup!
Limit concurrent access with fork-safe semaphores:
(require '[org.replikativ.spindel.semaphore :refer [semaphore acquire release holding]])
(def sem (semaphore 3)) ;; max 3 concurrent
;; Manual acquire/release
(spin
(await (acquire sem))
(try
(do-limited-work)
(finally
(release sem))))
;; Or use `holding` for automatic release
(spin
(await (holding sem
(spin (do-limited-work)))))
@sem returns current available permitsawait, track, yieldaccumulateCan you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |