JVM-only blocking convenience layer over nats-cljc.core (ADR 0008): the same
verb names, but synchronous. A caller switches semantics by swapping a single
require. A plain .clj file (not a .cljc with reader conditionals), so
shadow-cljs never compiles it — the pull API is JVM-only by construction and
has no ClojureScript counterpart, which is the reason this layer exists.
A pure consumer of the portable core: it owns no jnats interop and reaches no
protocol primitive. One-shots block and unwrap the core's CompletableFuture so
they throw the canonical ex-info directly; subscriptions get a pull model the
portable push core structurally cannot offer.
JVM-only blocking convenience layer over `nats-cljc.core` (ADR 0008): the same verb names, but synchronous. A caller switches semantics by swapping a single require. A plain `.clj` file (not a `.cljc` with reader conditionals), so shadow-cljs never compiles it — the pull API is JVM-only by construction and has no ClojureScript counterpart, which is the reason this layer exists. A pure consumer of the portable core: it owns no jnats interop and reaches no protocol primitive. One-shots block and unwrap the core's CompletableFuture so they throw the canonical `ex-info` directly; subscriptions get a pull model the portable push core structurally cannot offer.
(active? sub)True while sub is still delivering: either it has not ended, or its buffer
still holds undelivered messages. False once the sub has ended AND the buffer
is drained (the poison sentinel sits alone at the tail).
True while `sub` is still delivering: either it has not ended, or its buffer still holds undelivered messages. False once the sub has ended AND the buffer is drained (the poison sentinel sits alone at the tail).
(close conn)Close conn (blocking): returns once the connection is fully closed. Ends —
and poisons — every pull sub opened on the connection.
Close `conn` (blocking): returns once the connection is fully closed. Ends — and poisons — every pull sub opened on the connection.
(connect opts)Open a connection (blocking): the synchronous twin of core/connect. Returns
the Connection directly rather than a promise of it. Closing or draining the
returned connection poisons every pull sub opened on it, so their parked
take-message/messages consumers wake (ADR 0008); this is wired by wrapping
the caller's :on-status to act on the :closed event.
Open a connection (blocking): the synchronous twin of `core/connect`. Returns the Connection directly rather than a promise of it. Closing or draining the returned connection poisons every pull sub opened on it, so their parked `take-message`/`messages` consumers wake (ADR 0008); this is wired by wrapping the caller's `:on-status` to act on the `:closed` event.
(drain conn-or-sub)Drain a connection or a single pull sub (blocking), returning nil once draining
completes. Draining a pull sub is graceful — the lower-bound sibling of
unsubscribe: it flushes the already-buffered messages before ending, rather
than dropping them. Flushing requires a concurrent consumer (another thread in
take-message/messages) unless the whole backlog fits below :capacity; with
the buffer at or above capacity the call blocks until a consumer makes room, so a
single-threaded "drain then consume" deadlocks once the backlog reaches
capacity. Draining a connection stops and closes it, which poisons every pull sub
opened on it.
Drain a connection or a single pull sub (blocking), returning nil once draining completes. Draining a pull sub is graceful — the lower-bound sibling of `unsubscribe`: it flushes the already-buffered messages before ending, rather than dropping them. Flushing requires a concurrent consumer (another thread in `take-message`/`messages`) unless the whole backlog fits below `:capacity`; with the buffer at or above capacity the call blocks until a consumer makes room, so a single-threaded "drain then consume" deadlocks once the backlog reaches capacity. Draining a connection stops and closes it, which poisons every pull sub opened on it.
(flush conn)Flush conn (blocking): returns once the server has processed everything
buffered on the connection.
Flush `conn` (blocking): returns once the server has processed everything buffered on the connection.
(messages sub)A reducible (IReduceInit) view of sub's decoded messages: reduce/run!
over it pulls each message with take-message and terminates on its own at
end-of-stream (the sub's teardown). Use reduce/run!, not a bare doseq.
A reducible (IReduceInit) view of `sub`'s decoded messages: `reduce`/`run!` over it pulls each message with `take-message` and terminates on its own at end-of-stream (the sub's teardown). Use `reduce`/`run!`, not a bare `doseq`.
Publish data to subject — already synchronous, re-exported unchanged.
Publish `data` to `subject` — already synchronous, re-exported unchanged.
Reply to a request message — already synchronous, re-exported unchanged.
Reply to a request message — already synchronous, re-exported unchanged.
(request conn subject data)(request conn subject data opts)Send a request and block for the decoded reply (the synchronous twin of
core/request). On failure throws the bare canonical ex-info — :no-responders
or :timeout — the same :type the async core's promise rejects with. The
3-arity defaults opts to {}.
Send a request and block for the decoded reply (the synchronous twin of
`core/request`). On failure throws the bare canonical `ex-info` — `:no-responders`
or `:timeout` — the same `:type` the async core's promise rejects with. The
3-arity defaults `opts` to `{}`.(subscribe conn subject)(subscribe conn subject {:keys [capacity] :or {capacity 1024} :as opts})Subscribe to subject, returning an opaque pull handle (no handler arg). Each
matching message is decoded with the connection's codec and enqueued on a
bounded buffer that take-message drains one at a time. opts may set
:capacity (the buffer's bound, default 1024; a non-positive value throws a
:type :invalid-capacity ex-info — there is no unbounded escape hatch), plus
the core's :codec/:queue/:on-error/:max-pending, passed through
unchanged. A full buffer blocks the feeding dispatcher — backpressure, never
drop (ADR 0008). Keep :capacity greater than any max armed via unsubscribe
so a graceful auto-end still delivers all N buffered messages; at or below max,
a full buffer evicts the oldest to keep teardown non-blocking (ADR 0013).
Subscribe to `subject`, returning an opaque pull handle (no handler arg). Each matching message is decoded with the connection's codec and enqueued on a bounded buffer that `take-message` drains one at a time. `opts` may set `:capacity` (the buffer's bound, default 1024; a non-positive value throws a `:type :invalid-capacity` ex-info — there is no unbounded escape hatch), plus the core's `:codec`/`:queue`/`:on-error`/`:max-pending`, passed through unchanged. A full buffer blocks the feeding dispatcher — backpressure, never drop (ADR 0008). Keep `:capacity` greater than any `max` armed via `unsubscribe` so a graceful auto-end still delivers all N buffered messages; at or below `max`, a full buffer evicts the oldest to keep teardown non-blocking (ADR 0013).
(take-message sub)(take-message sub timeout-ms)Pull the next message off sub's buffer, blocking the calling thread. With
timeout-ms, block at most that long (0 = poll) and return nil on timeout.
Returns the decoded {:subject :data :reply (:headers)} map, or nil for both a
timeout and end-of-stream — disambiguate with active?.
Pull the next message off `sub`'s buffer, blocking the calling thread. With
`timeout-ms`, block at most that long (0 = poll) and return nil on timeout.
Returns the decoded `{:subject :data :reply (:headers)}` map, or nil for both a
timeout and end-of-stream — disambiguate with `active?`.(unsubscribe sub)(unsubscribe sub max)End sub abruptly, returning nil synchronously (the lower-level sibling of
drain): stop the inner subscription, drop any not-yet-taken messages, and
poison the buffer so a parked take-message wakes and a parked producer
unblocks. Idempotent.
With max, the sub auto-unsubscribes once max messages have arrived over its
lifetime — counted from subscription start, consistent with the async core's
[sub max] (ADR 0008/0012). Unlike the abrupt arity, the already-buffered N are
still delivered (the consumer sees exactly N, in order) before
take-message/messages reach end-of-stream — guaranteed when :capacity >
max; on a full buffer (a :capacity ≤ max with a lagging consumer) the
oldest are evicted to keep teardown non-blocking (ADR 0013). If the sub has
already received max, it ends now. max must be a positive integer no greater than 2147483647
(the JVM Dispatcher.unsubscribe(sub, int) cap), enforced before the native call
— parity with core; anything else throws a :type :invalid-max ex-info.
End `sub` abruptly, returning nil synchronously (the lower-level sibling of `drain`): stop the inner subscription, drop any not-yet-taken messages, and poison the buffer so a parked `take-message` wakes and a parked producer unblocks. Idempotent. With `max`, the sub auto-unsubscribes once `max` messages have arrived over its lifetime — counted from subscription start, consistent with the async core's `[sub max]` (ADR 0008/0012). Unlike the abrupt arity, the already-buffered N are still delivered (the consumer sees exactly N, in order) before `take-message`/`messages` reach end-of-stream — guaranteed when `:capacity` > `max`; on a full buffer (a `:capacity` ≤ `max` with a lagging consumer) the oldest are evicted to keep teardown non-blocking (ADR 0013). If the sub has already received `max`, it ends now. `max` must be a positive integer no greater than 2147483647 (the JVM `Dispatcher.unsubscribe(sub, int)` cap), enforced before the native call — parity with core; anything else throws a `:type :invalid-max` ex-info.
Current library version — re-exported unchanged.
Current library version — re-exported unchanged.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |