Liking cljdoc? Tell your friends :D

nats-cljc.blocking.core

JVM-only blocking convenience layer over nats-cljc.core (ADR 0008): the same verb names, but synchronous. A caller switches semantics by swapping a single require. A plain .clj file (not a .cljc with reader conditionals), so shadow-cljs never compiles it — the pull API is JVM-only by construction and has no ClojureScript counterpart, which is the reason this layer exists.

A pure consumer of the portable core: it owns no jnats interop and reaches no protocol primitive. One-shots block and unwrap the core's CompletableFuture so they throw the canonical ex-info directly; subscriptions get a pull model the portable push core structurally cannot offer.

JVM-only blocking convenience layer over `nats-cljc.core` (ADR 0008): the same
verb names, but synchronous. A caller switches semantics by swapping a single
require. A plain `.clj` file (not a `.cljc` with reader conditionals), so
shadow-cljs never compiles it — the pull API is JVM-only by construction and
has no ClojureScript counterpart, which is the reason this layer exists.

A pure consumer of the portable core: it owns no jnats interop and reaches no
protocol primitive. One-shots block and unwrap the core's CompletableFuture so
they throw the canonical `ex-info` directly; subscriptions get a pull model the
portable push core structurally cannot offer.
raw docstring

active?clj

(active? sub)

True while sub is still delivering: either it has not ended, or its buffer still holds undelivered messages. False once the sub has ended AND the buffer is drained (the poison sentinel sits alone at the tail).

True while `sub` is still delivering: either it has not ended, or its buffer
still holds undelivered messages. False once the sub has ended AND the buffer
is drained (the poison sentinel sits alone at the tail).
sourceraw docstring

closeclj

(close conn)

Close conn (blocking): returns once the connection is fully closed. Ends — and poisons — every pull sub opened on the connection.

Close `conn` (blocking): returns once the connection is fully closed. Ends —
and poisons — every pull sub opened on the connection.
sourceraw docstring

connectclj

(connect opts)

Open a connection (blocking): the synchronous twin of core/connect. Returns the Connection directly rather than a promise of it. Closing or draining the returned connection poisons every pull sub opened on it, so their parked take-message/messages consumers wake (ADR 0008); this is wired by wrapping the caller's :on-status to act on the :closed event.

Open a connection (blocking): the synchronous twin of `core/connect`. Returns
the Connection directly rather than a promise of it. Closing or draining the
returned connection poisons every pull sub opened on it, so their parked
`take-message`/`messages` consumers wake (ADR 0008); this is wired by wrapping
the caller's `:on-status` to act on the `:closed` event.
sourceraw docstring

drainclj

(drain conn-or-sub)

Drain a connection or a single pull sub (blocking), returning nil once draining completes. Draining a pull sub is graceful — the lower-bound sibling of unsubscribe: it flushes the already-buffered messages before ending, rather than dropping them. Flushing requires a concurrent consumer (another thread in take-message/messages) unless the whole backlog fits below :capacity; with the buffer at or above capacity the call blocks until a consumer makes room, so a single-threaded "drain then consume" deadlocks once the backlog reaches capacity. Draining a connection stops and closes it, which poisons every pull sub opened on it.

Drain a connection or a single pull sub (blocking), returning nil once draining
completes. Draining a pull sub is graceful — the lower-bound sibling of
`unsubscribe`: it flushes the already-buffered messages before ending, rather
than dropping them. Flushing requires a concurrent consumer (another thread in
`take-message`/`messages`) unless the whole backlog fits below `:capacity`; with
the buffer at or above capacity the call blocks until a consumer makes room, so a
single-threaded "drain then consume" deadlocks once the backlog reaches
capacity. Draining a connection stops and closes it, which poisons every pull sub
opened on it.
sourceraw docstring

flushclj

(flush conn)

Flush conn (blocking): returns once the server has processed everything buffered on the connection.

Flush `conn` (blocking): returns once the server has processed everything
buffered on the connection.
sourceraw docstring

messagesclj

(messages sub)

A reducible (IReduceInit) view of sub's decoded messages: reduce/run! over it pulls each message with take-message and terminates on its own at end-of-stream (the sub's teardown). Use reduce/run!, not a bare doseq.

A reducible (IReduceInit) view of `sub`'s decoded messages: `reduce`/`run!`
over it pulls each message with `take-message` and terminates on its own at
end-of-stream (the sub's teardown). Use `reduce`/`run!`, not a bare `doseq`.
sourceraw docstring

publishclj

Publish data to subject — already synchronous, re-exported unchanged.

Publish `data` to `subject` — already synchronous, re-exported unchanged.
sourceraw docstring

replyclj

Reply to a request message — already synchronous, re-exported unchanged.

Reply to a request message — already synchronous, re-exported unchanged.
sourceraw docstring

requestclj

(request conn subject data)
(request conn subject data opts)

Send a request and block for the decoded reply (the synchronous twin of core/request). On failure throws the bare canonical ex-info:no-responders or :timeout — the same :type the async core's promise rejects with. The 3-arity defaults opts to {}.

Send a request and block for the decoded reply (the synchronous twin of
`core/request`). On failure throws the bare canonical `ex-info` — `:no-responders`
or `:timeout` — the same `:type` the async core's promise rejects with. The
3-arity defaults `opts` to `{}`.
sourceraw docstring

subscribeclj

(subscribe conn subject)
(subscribe conn subject {:keys [capacity] :or {capacity 1024} :as opts})

Subscribe to subject, returning an opaque pull handle (no handler arg). Each matching message is decoded with the connection's codec and enqueued on a bounded buffer that take-message drains one at a time. opts may set :capacity (the buffer's bound, default 1024; a non-positive value throws a :type :invalid-capacity ex-info — there is no unbounded escape hatch), plus the core's :codec/:queue/:on-error/:max-pending, passed through unchanged. A full buffer blocks the feeding dispatcher — backpressure, never drop (ADR 0008). Keep :capacity greater than any max armed via unsubscribe so a graceful auto-end still delivers all N buffered messages; at or below max, a full buffer evicts the oldest to keep teardown non-blocking (ADR 0013).

Subscribe to `subject`, returning an opaque pull handle (no handler arg). Each
matching message is decoded with the connection's codec and enqueued on a
bounded buffer that `take-message` drains one at a time. `opts` may set
`:capacity` (the buffer's bound, default 1024; a non-positive value throws a
`:type :invalid-capacity` ex-info — there is no unbounded escape hatch), plus
the core's `:codec`/`:queue`/`:on-error`/`:max-pending`, passed through
unchanged. A full buffer blocks the feeding dispatcher — backpressure, never
drop (ADR 0008). Keep `:capacity` greater than any `max` armed via `unsubscribe`
so a graceful auto-end still delivers all N buffered messages; at or below `max`,
a full buffer evicts the oldest to keep teardown non-blocking (ADR 0013).
sourceraw docstring

take-messageclj

(take-message sub)
(take-message sub timeout-ms)

Pull the next message off sub's buffer, blocking the calling thread. With timeout-ms, block at most that long (0 = poll) and return nil on timeout. Returns the decoded {:subject :data :reply (:headers)} map, or nil for both a timeout and end-of-stream — disambiguate with active?.

Pull the next message off `sub`'s buffer, blocking the calling thread. With
`timeout-ms`, block at most that long (0 = poll) and return nil on timeout.
Returns the decoded `{:subject :data :reply (:headers)}` map, or nil for both a
timeout and end-of-stream — disambiguate with `active?`.
sourceraw docstring

unsubscribeclj

(unsubscribe sub)
(unsubscribe sub max)

End sub abruptly, returning nil synchronously (the lower-level sibling of drain): stop the inner subscription, drop any not-yet-taken messages, and poison the buffer so a parked take-message wakes and a parked producer unblocks. Idempotent.

With max, the sub auto-unsubscribes once max messages have arrived over its lifetime — counted from subscription start, consistent with the async core's [sub max] (ADR 0008/0012). Unlike the abrupt arity, the already-buffered N are still delivered (the consumer sees exactly N, in order) before take-message/messages reach end-of-stream — guaranteed when :capacity > max; on a full buffer (a :capacitymax with a lagging consumer) the oldest are evicted to keep teardown non-blocking (ADR 0013). If the sub has already received max, it ends now. max must be a positive integer no greater than 2147483647 (the JVM Dispatcher.unsubscribe(sub, int) cap), enforced before the native call — parity with core; anything else throws a :type :invalid-max ex-info.

End `sub` abruptly, returning nil synchronously (the lower-level sibling of
`drain`): stop the inner subscription, drop any not-yet-taken messages, and
poison the buffer so a parked `take-message` wakes and a parked producer
unblocks. Idempotent.

With `max`, the sub auto-unsubscribes once `max` messages have arrived over its
lifetime — counted from subscription start, consistent with the async core's
`[sub max]` (ADR 0008/0012). Unlike the abrupt arity, the already-buffered N are
still delivered (the consumer sees exactly N, in order) before
`take-message`/`messages` reach end-of-stream — guaranteed when `:capacity` >
`max`; on a full buffer (a `:capacity` ≤ `max` with a lagging consumer) the
oldest are evicted to keep teardown non-blocking (ADR 0013). If the sub has
already received `max`, it ends now. `max` must be a positive integer no greater than 2147483647
(the JVM `Dispatcher.unsubscribe(sub, int)` cap), enforced before the native call
— parity with core; anything else throws a `:type :invalid-max` ex-info.
sourceraw docstring

versionclj

Current library version — re-exported unchanged.

Current library version — re-exported unchanged.
sourceraw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close