Portable public facade for JetStream (aliased jet — CLJS reserves js for
host interop), the durable half of NATS. A thin .cljc surface mirroring
nats-cljc.core: the same consumer code compiles and runs on the JVM, the
browser, and Node.
Requiring this namespace loads the per-leg JetStream impl — and, on CLJS, pulls
the @nats-io/jetstream bundle bytes — which a core-only consumer who never
requires it does not pay for (ADR 0016). The impl require is for that load
side-effect only (it extends the JetStream protocol onto the platform
Connection record); this facade calls the record through the protocol.
Portable public facade for JetStream (aliased `jet` — CLJS reserves `js` for host interop), the durable half of NATS. A thin `.cljc` surface mirroring `nats-cljc.core`: the same consumer code compiles and runs on the JVM, the browser, and Node. Requiring this namespace loads the per-leg JetStream impl — and, on CLJS, pulls the `@nats-io/jetstream` bundle bytes — which a core-only consumer who never requires it does not pay for (ADR 0016). The impl require is for that load side-effect only (it `extend`s the JetStream protocol onto the platform Connection record); this facade calls the record through the protocol.
(ack conn msg)Acknowledge the delivered JetStream message msg as processed, on conn (the
connection, explicit as in core reply), stopping its redelivery. Sugar over
publish of the +ACK protocol payload to the message's ack subject (under :js :ack-subject) — never a native .ack(), so the wire bytes are identical on
both legs (ADR 0019). Synchronous, returns nil, and idempotent: a redundant ack
of an already-acked message is a harmless publish the server ignores, never a
throw. Throws an ex-info :type :no-ack-subject when msg carries no ack
subject (it is not a delivered JetStream message), rather than publishing to a
nil subject — the reply :no-reply-subject precedent.
Acknowledge the delivered JetStream message `msg` as processed, on `conn` (the connection, explicit as in core `reply`), stopping its redelivery. Sugar over publish of the `+ACK` protocol payload to the message's ack subject (under `:js :ack-subject`) — never a native `.ack()`, so the wire bytes are identical on both legs (ADR 0019). Synchronous, returns nil, and idempotent: a redundant ack of an already-acked message is a harmless publish the server ignores, never a throw. Throws an ex-info `:type :no-ack-subject` when `msg` carries no ack subject (it is not a delivered JetStream message), rather than publishing to a nil subject — the `reply` `:no-reply-subject` precedent.
(consume ordered handler)(consume ordered handler opts)(consume ctx stream consumer handler)(consume ctx stream consumer handler opts)Continuously deliver from the durable Consumer consumer on Stream stream
through the JetStream context ctx, invoking handler with one PURE-DATA
message {:subject :data :js} at a time (plus :headers when present, shape as
in fetch), and returning a platform-native promise that resolves to a handle —
drainable and unsubscribable exactly like a core Subscription, via core/drain
and core/unsubscribe (ADR 0018).
handler is the core handler contract (ADR 0007): it may return a promise, and
the runtime waits for that promise to settle before delivering the next message
AND refilling — per-message backpressure with no async dependency, the client's
read rate gating its own pull rate. There is NO :max-pending/:slow-consumer
in pull: unrequested messages simply wait on the server (ADR 0018).
opts are the refill knobs (ADR 0018): :batch (max messages per pull window,
default 100), :threshold (refill once the buffered count drops to it; a COUNT
portably — the JVM converts count->percent; each client's default is 75% of
:batch), :expires-ms (the pull window — also how long a drain may take to
wind down the open pull), :idle-heartbeat-ms (server liveness pulses while
idle; accepted on both legs, but the JVM client derives its own cadence from
:expires-ms — shape, not cadence, ADR 0006), :max-bytes (a BYTE window per
pull, mutually exclusive with the message-count window — nats.js forbids setting
both, so :max-bytes with :batch or :threshold is rejected), and :codec (a
per-call decode override). The promise rejects pre-flight with a validation
:type :invalid-name when stream or consumer is malformed,
:invalid-threshold when :threshold is not a positive integer no greater than
:batch, :exclusive-window when :max-bytes is combined with :batch/
:threshold, :invalid-expires when a supplied :expires-ms is below the
1000ms floor or not a whole number (ADR 0015) — and operationally with
:consumer-not-found when no such Consumer exists (ADR 0020).
:on-error (named consumes) is the per-consume error sink (ADR 0020): the
consume-time side-band conditions — :heartbeats-missed, :consumer-deleted,
:exceeded-limits, with a backing-stream loss reusing :stream-not-found —
reach it as bare ex-infos, exactly like core's per-subscription
:slow-consumer row: this sink ONLY, dropped when unset, never the connection
:on-status. A handler throw or decode failure reaches the same sink, and
delivery continues. Terminal conditions (the Consumer or its backing Stream is
gone) additionally END the consume — the handle goes inactive — whether or not
:on-error is set.
On the handle, core/drain stops new pulls and settles once the consume winds
down (on the JVM buffered messages deliver first; on CLJS the buffer is
discarded — un-acked, so the server redelivers them); core/unsubscribe ends it
abruptly and idempotently, and takes no max (a consume has no auto-unsubscribe
count — passing one throws :type :invalid-max).
The handle-first arities consume from an ordered-consumer handle instead — same
handler contract, refill knobs, and drainable handle; deliveries arrive in stream
order with no acknowledgements to take.
Continuously deliver from the durable Consumer `consumer` on Stream `stream`
through the JetStream context `ctx`, invoking `handler` with one PURE-DATA
message `{:subject :data :js}` at a time (plus `:headers` when present, shape as
in `fetch`), and returning a platform-native promise that resolves to a handle —
drainable and unsubscribable exactly like a core Subscription, via `core/drain`
and `core/unsubscribe` (ADR 0018).
`handler` is the core handler contract (ADR 0007): it may return a promise, and
the runtime waits for that promise to settle before delivering the next message
AND refilling — per-message backpressure with no async dependency, the client's
read rate gating its own pull rate. There is NO `:max-pending`/`:slow-consumer`
in pull: unrequested messages simply wait on the server (ADR 0018).
`opts` are the refill knobs (ADR 0018): `:batch` (max messages per pull window,
default 100), `:threshold` (refill once the buffered count drops to it; a COUNT
portably — the JVM converts count->percent; each client's default is 75% of
`:batch`), `:expires-ms` (the pull window — also how long a `drain` may take to
wind down the open pull), `:idle-heartbeat-ms` (server liveness pulses while
idle; accepted on both legs, but the JVM client derives its own cadence from
`:expires-ms` — shape, not cadence, ADR 0006), `:max-bytes` (a BYTE window per
pull, mutually exclusive with the message-count window — nats.js forbids setting
both, so `:max-bytes` with `:batch` or `:threshold` is rejected), and `:codec` (a
per-call decode override). The promise rejects pre-flight with a validation
`:type :invalid-name` when `stream` or `consumer` is malformed,
`:invalid-threshold` when `:threshold` is not a positive integer no greater than
`:batch`, `:exclusive-window` when `:max-bytes` is combined with `:batch`/
`:threshold`, `:invalid-expires` when a supplied `:expires-ms` is below the
1000ms floor or not a whole number (ADR 0015) — and operationally with
`:consumer-not-found` when no such Consumer exists (ADR 0020).
`:on-error` (named consumes) is the per-consume error sink (ADR 0020): the
consume-time side-band conditions — `:heartbeats-missed`, `:consumer-deleted`,
`:exceeded-limits`, with a backing-stream loss reusing `:stream-not-found` —
reach it as bare ex-infos, exactly like core's per-subscription
`:slow-consumer` row: this sink ONLY, dropped when unset, never the connection
`:on-status`. A handler throw or decode failure reaches the same sink, and
delivery continues. Terminal conditions (the Consumer or its backing Stream is
gone) additionally END the consume — the handle goes inactive — whether or not
`:on-error` is set.
On the handle, `core/drain` stops new pulls and settles once the consume winds
down (on the JVM buffered messages deliver first; on CLJS the buffer is
discarded — un-acked, so the server redelivers them); `core/unsubscribe` ends it
abruptly and idempotently, and takes no `max` (a consume has no auto-unsubscribe
count — passing one throws `:type :invalid-max`).
The handle-first arities consume from an `ordered-consumer` handle instead — same
handler contract, refill knobs, and drainable handle; deliveries arrive in stream
order with no acknowledgements to take.(consumer-info ctx stream name)Look up the Consumer named name on the Stream stream through the JetStream
context ctx, returning a platform-native promise that resolves to the normalized
ConsumerInfo map (see create-consumer). The promise rejects with an operational
:type :consumer-not-found when no such Consumer exists, and pre-flight with a
validation :type :invalid-name when stream or name is malformed (ADR 0015/0020).
Look up the Consumer named `name` on the Stream `stream` through the JetStream context `ctx`, returning a platform-native promise that resolves to the normalized ConsumerInfo map (see `create-consumer`). The promise rejects with an operational `:type :consumer-not-found` when no such Consumer exists, and pre-flight with a validation `:type :invalid-name` when `stream` or `name` is malformed (ADR 0015/0020).
(consumer-names ctx stream)Enumerate the durable names of the Consumers on the Stream stream through the
JetStream context ctx, returning a platform-native promise that resolves to a
vector of name strings — the name projection of list-consumers, from which it is
derived (nats.js exposes no names endpoint).
Enumerate the durable names of the Consumers on the Stream `stream` through the JetStream context `ctx`, returning a platform-native promise that resolves to a vector of name strings — the name projection of `list-consumers`, from which it is derived (nats.js exposes no names endpoint).
(create-consumer ctx stream config)Create a durable Consumer on the Stream named stream through the JetStream context
ctx, from the portable, CLOSED kebab config map, returning a platform-native
promise that resolves to the normalized ConsumerInfo map ({:stream :name :config :created :delivered :ack-floor :pending}, :created an ISO-8601 string). Config
keys: :name (required — the durable), :ack-policy (:none | :all | :explicit),
:deliver-policy (:all | :last | :new | :last-per-subject), :ack-wait-ms
(integer milliseconds), :max-deliver (integer), and :filter-subjects (a vector of
subject strings). The map is closed: an unrecognized key rejects the promise with a
validation :type :unknown-config-key, and a malformed :name or stream with
:invalid-name, both pre-flight before any native call (ADR 0015). A config the
SERVER rejects rejects with an operational :type :jetstream-api-error carrying
{:code :description} (ADR 0020).
Create a durable Consumer on the Stream named `stream` through the JetStream context
`ctx`, from the portable, CLOSED kebab `config` map, returning a platform-native
promise that resolves to the normalized ConsumerInfo map (`{:stream :name :config
:created :delivered :ack-floor :pending}`, `:created` an ISO-8601 string). Config
keys: `:name` (required — the durable), `:ack-policy` (`:none` | `:all` | `:explicit`),
`:deliver-policy` (`:all` | `:last` | `:new` | `:last-per-subject`), `:ack-wait-ms`
(integer milliseconds), `:max-deliver` (integer), and `:filter-subjects` (a vector of
subject strings). The map is closed: an unrecognized key rejects the promise with a
validation `:type :unknown-config-key`, and a malformed `:name` or `stream` with
`:invalid-name`, both pre-flight before any native call (ADR 0015). A config the
SERVER rejects rejects with an operational `:type :jetstream-api-error` carrying
`{:code :description}` (ADR 0020).(create-stream ctx config)Create a Stream on the JetStream context ctx from the portable, CLOSED kebab
config map, returning a platform-native promise that resolves to the
normalized StreamInfo map ({:config :created :state}, :created an ISO-8601
string). Config keys: :name (required), :subjects, :storage (:file |
:memory), :retention (:limits | :interest | :work-queue), and
:max-age-ms (integer milliseconds). The map is closed: an unrecognized key
rejects the promise with a validation :type :unknown-config-key, and a
malformed :name with :invalid-name, both pre-flight before any native call
(ADR 0015). A config the SERVER rejects (e.g. a subject overlap) rejects with an
operational :type :jetstream-api-error carrying {:code :description} — it is
detected after the native call, so it is operational, not validation (ADR 0020).
Create a Stream on the JetStream context `ctx` from the portable, CLOSED kebab
`config` map, returning a platform-native promise that resolves to the
normalized StreamInfo map (`{:config :created :state}`, `:created` an ISO-8601
string). Config keys: `:name` (required), `:subjects`, `:storage` (`:file` |
`:memory`), `:retention` (`:limits` | `:interest` | `:work-queue`), and
`:max-age-ms` (integer milliseconds). The map is closed: an unrecognized key
rejects the promise with a validation `:type :unknown-config-key`, and a
malformed `:name` with `:invalid-name`, both pre-flight before any native call
(ADR 0015). A config the SERVER rejects (e.g. a subject overlap) rejects with an
operational `:type :jetstream-api-error` carrying `{:code :description}` — it is
detected after the native call, so it is operational, not validation (ADR 0020).(delete-consumer ctx stream name)Delete the Consumer named name on the Stream stream through the JetStream
context ctx, returning a platform-native promise that resolves to nil once it is
gone. The promise rejects with an operational :type :consumer-not-found when no
such Consumer exists, and pre-flight with a validation :type :invalid-name when
stream or name is malformed (ADR 0015/0020).
Delete the Consumer named `name` on the Stream `stream` through the JetStream context `ctx`, returning a platform-native promise that resolves to nil once it is gone. The promise rejects with an operational `:type :consumer-not-found` when no such Consumer exists, and pre-flight with a validation `:type :invalid-name` when `stream` or `name` is malformed (ADR 0015/0020).
(delete-stream ctx name)Delete the Stream named name on the JetStream context ctx, returning a
platform-native promise that resolves to nil once it is gone. The promise
rejects with an operational :type :stream-not-found when no such Stream
exists, and pre-flight with a validation :type :invalid-name when name is
malformed (ADR 0015/0020).
Delete the Stream named `name` on the JetStream context `ctx`, returning a platform-native promise that resolves to nil once it is gone. The promise rejects with an operational `:type :stream-not-found` when no such Stream exists, and pre-flight with a validation `:type :invalid-name` when `name` is malformed (ADR 0015/0020).
(double-ack conn msg)(double-ack conn msg opts)Acknowledge the delivered JetStream message msg on conn AND await the
server's confirmation, returning a platform-native promise that resolves to
true once the ack is known processed — where ack is fire-and-forget. Sugar
over request of the +ACK protocol payload to the message's ack subject; the
server's (empty) reply is the confirmation (ADR 0019). Named double-ack (the
NATS community term), not jnats' ack-sync — ours is asynchronous. Idempotent:
the server confirms a redundant ack too, so double-acking the same message
again resolves true, never throws. opts may set :timeout-ms (default 5000);
a confirmation that never arrives rejects with the core :timeout, and a
message without an ack subject rejects with :type :no-ack-subject — on the
returned promise, never a synchronous throw (ADR 0006).
Acknowledge the delivered JetStream message `msg` on `conn` AND await the server's confirmation, returning a platform-native promise that resolves to true once the ack is known processed — where `ack` is fire-and-forget. Sugar over `request` of the `+ACK` protocol payload to the message's ack subject; the server's (empty) reply is the confirmation (ADR 0019). Named double-ack (the NATS community term), not jnats' ack-sync — ours is asynchronous. Idempotent: the server confirms a redundant ack too, so double-acking the same message again resolves true, never throws. `opts` may set `:timeout-ms` (default 5000); a confirmation that never arrives rejects with the core `:timeout`, and a message without an ack subject rejects with `:type :no-ack-subject` — on the returned promise, never a synchronous throw (ADR 0006).
(fetch ordered)(fetch ordered opts)(fetch ctx stream consumer)(fetch ctx stream consumer opts)Fetch a bounded batch from the durable Consumer consumer on Stream stream through
the JetStream context ctx, returning a platform-native promise that resolves to a
vector of up to :batch PURE-DATA messages {:subject :data :js} (plus :headers
when the message carried some), each :data decoded with the context codec, in stream
order (ADR 0018). A delivered message is plain data — no native object — with its
JetStream metadata under :js {:stream :consumer :stream-seq :delivery-seq :delivered :pending :redelivered :timestamp :domain :ack-subject}, :timestamp an ISO-8601 string
and :redelivered true once :delivered exceeds 1 (ADR 0019). opts: :batch (max
messages, default 100), :expires-ms (the window after which a batch shorter than
:batch settles with what it has; omitted, the window is the 30000ms default both
clients independently apply — jnats' FetchConsumeOptions and nats.js' fetch — so the
legs agree on the wait when a caller omits it), and :codec (a per-call decode
override). The
promise rejects pre-flight with a validation :type :invalid-name when stream or
consumer is malformed, and :invalid-expires when a supplied :expires-ms is below
the 1000ms floor both clients enforce or is not a whole number (ADR 0015).
The handle-first arities fetch from an ordered-consumer handle instead — same
message shape, opts, and :invalid-expires guard; the handle already names its
Stream and the library manages the ephemeral, so there is nothing to validate
by name.
Fetch a bounded batch from the durable Consumer `consumer` on Stream `stream` through
the JetStream context `ctx`, returning a platform-native promise that resolves to a
vector of up to `:batch` PURE-DATA messages `{:subject :data :js}` (plus `:headers`
when the message carried some), each `:data` decoded with the context codec, in stream
order (ADR 0018). A delivered message is plain data — no native object — with its
JetStream metadata under `:js` `{:stream :consumer :stream-seq :delivery-seq :delivered
:pending :redelivered :timestamp :domain :ack-subject}`, `:timestamp` an ISO-8601 string
and `:redelivered` true once `:delivered` exceeds 1 (ADR 0019). `opts`: `:batch` (max
messages, default 100), `:expires-ms` (the window after which a batch shorter than
`:batch` settles with what it has; omitted, the window is the 30000ms default both
clients independently apply — jnats' FetchConsumeOptions and nats.js' fetch — so the
legs agree on the wait when a caller omits it), and `:codec` (a per-call decode
override). The
promise rejects pre-flight with a validation `:type :invalid-name` when `stream` or
`consumer` is malformed, and `:invalid-expires` when a supplied `:expires-ms` is below
the 1000ms floor both clients enforce or is not a whole number (ADR 0015).
The handle-first arities fetch from an `ordered-consumer` handle instead — same
message shape, opts, and `:invalid-expires` guard; the handle already names its
Stream and the library manages the ephemeral, so there is nothing to validate
by name.(jetstream conn)Obtain the JetStream context for conn, returning a platform-native promise
(CompletableFuture on the JVM, js/Promise on CLJS) that resolves to a single
context holding both the data plane (publish, pull) and the management plane
(stream/consumer admin) — every JetStream operation flows through it (ADR 0017).
Obtaining it verifies JetStream is enabled by forcing a JS-info round-trip on
both legs, so the promise rejects with an ex-info :type :jetstream-not-enabled
(err 10039) when the server/account has JetStream disabled — at the handle,
never deferred to the first operation (ADR 0017/0020).
Obtain the JetStream context for `conn`, returning a platform-native promise (CompletableFuture on the JVM, js/Promise on CLJS) that resolves to a single context holding both the data plane (publish, pull) and the management plane (stream/consumer admin) — every JetStream operation flows through it (ADR 0017). Obtaining it verifies JetStream is enabled by forcing a JS-info round-trip on both legs, so the promise rejects with an `ex-info` `:type :jetstream-not-enabled` (err 10039) when the server/account has JetStream disabled — at the handle, never deferred to the first operation (ADR 0017/0020).
(list-consumers ctx stream)Enumerate the Consumers on the Stream stream through the JetStream context ctx,
returning a platform-native promise that resolves to a vector of normalized
ConsumerInfo maps (see create-consumer), one per durable. The promise rejects
pre-flight with a validation :type :invalid-name when stream is malformed
(ADR 0015/0020).
Enumerate the Consumers on the Stream `stream` through the JetStream context `ctx`, returning a platform-native promise that resolves to a vector of normalized ConsumerInfo maps (see `create-consumer`), one per durable. The promise rejects pre-flight with a validation `:type :invalid-name` when `stream` is malformed (ADR 0015/0020).
(list-streams ctx)Enumerate the Streams on the server through the JetStream context ctx,
returning a platform-native promise that resolves to a vector of normalized
StreamInfo maps (see create-stream), one per Stream.
Enumerate the Streams on the server through the JetStream context `ctx`, returning a platform-native promise that resolves to a vector of normalized StreamInfo maps (see `create-stream`), one per Stream.
(nak conn msg)(nak conn msg opts)Negatively acknowledge the delivered JetStream message msg on conn: signal
it was NOT processed, asking the server to redeliver it (immediately, or after
:delay-ms milliseconds when set in opts). Sugar over publish of the -NAK
protocol payload — -NAK {"delay":ns} with a delay — to the message's ack
subject (ADR 0019), shaped exactly as ack: synchronous, returns nil,
idempotent, and throws :type :no-ack-subject on a message without an ack
subject.
Negatively acknowledge the delivered JetStream message `msg` on `conn`: signal
it was NOT processed, asking the server to redeliver it (immediately, or after
`:delay-ms` milliseconds when set in `opts`). Sugar over publish of the `-NAK`
protocol payload — `-NAK {"delay":ns}` with a delay — to the message's ack
subject (ADR 0019), shaped exactly as `ack`: synchronous, returns nil,
idempotent, and throws `:type :no-ack-subject` on a message without an ack
subject.(next ordered)(next ordered opts)(next ctx stream consumer)(next ctx stream consumer opts)Poll a single message from the durable Consumer consumer on Stream stream through
the JetStream context ctx, returning a platform-native promise that resolves to ONE
PURE-DATA message {:subject :data :js} (plus :headers when present, shape as in
fetch), or nil when no message arrives within the poll window — an empty consumer
(ADR 0018). opts: :expires-ms (how long to wait for a message before resolving nil;
omitted, it waits the 30000ms default both clients independently apply — jnats'
no-arg next() and nats.js' next — so the legs agree on the wait) and :codec
(a per-call decode override). The promise
rejects pre-flight with a validation :type :invalid-name when stream or consumer
is malformed, and :invalid-expires when a supplied :expires-ms is below the 1000ms
floor both clients enforce or is not a whole number (ADR 0015).
The handle-first arities poll an ordered-consumer handle instead — same message
shape, opts, and :invalid-expires guard; successive polls continue from the
handle's tracked position, in stream order.
Poll a single message from the durable Consumer `consumer` on Stream `stream` through
the JetStream context `ctx`, returning a platform-native promise that resolves to ONE
PURE-DATA message `{:subject :data :js}` (plus `:headers` when present, shape as in
`fetch`), or nil when no message arrives within the poll window — an empty consumer
(ADR 0018). `opts`: `:expires-ms` (how long to wait for a message before resolving nil;
omitted, it waits the 30000ms default both clients independently apply — jnats'
no-arg next() and nats.js' next — so the legs agree on the wait) and `:codec`
(a per-call decode override). The promise
rejects pre-flight with a validation `:type :invalid-name` when `stream` or `consumer`
is malformed, and `:invalid-expires` when a supplied `:expires-ms` is below the 1000ms
floor both clients enforce or is not a whole number (ADR 0015).
The handle-first arities poll an `ordered-consumer` handle instead — same message
shape, opts, and `:invalid-expires` guard; successive polls continue from the
handle's tracked position, in stream order.(ordered-consumer ctx stream)(ordered-consumer ctx stream opts)Create an Ordered consumer over the Stream stream through the JetStream context
ctx — a specialized Ephemeral consumer for single-client, gap-free replay:
server-managed, taking NO acknowledgements (ack policy none), and automatically
recreated by the client should a sequence gap appear — returning a platform-native
promise that resolves to an ordered pull handle. The handle plugs into the same
pull triad as a named Consumer, via the handle-first arities: (next handle opts),
(fetch handle opts), (consume handle handler opts) — same pure-data messages,
same opts, same drainable consume handle (ADR 0018/0019). It leaves no durable
Consumer behind: the underlying ephemeral is reclaimed by the server after its
inactivity window.
opts is the portable, CLOSED kebab map tuning what the handle replays:
:filter-subjects (a vector of subject strings) and :deliver-policy (:all |
:last | :new | :last-per-subject); everything else — name, ack policy,
recreation — is the library's business. The promise rejects pre-flight with a
validation :type :invalid-name when stream is malformed and
:unknown-config-key for an unrecognized opts key (ADR 0015), and operationally
with :stream-not-found when no such Stream exists — both legs round-trip stream
info at creation (ADR 0020).
Create an Ordered consumer over the Stream `stream` through the JetStream context `ctx` — a specialized Ephemeral consumer for single-client, gap-free replay: server-managed, taking NO acknowledgements (ack policy none), and automatically recreated by the client should a sequence gap appear — returning a platform-native promise that resolves to an ordered pull handle. The handle plugs into the same pull triad as a named Consumer, via the handle-first arities: `(next handle opts)`, `(fetch handle opts)`, `(consume handle handler opts)` — same pure-data messages, same opts, same drainable consume handle (ADR 0018/0019). It leaves no durable Consumer behind: the underlying ephemeral is reclaimed by the server after its inactivity window. `opts` is the portable, CLOSED kebab map tuning what the handle replays: `:filter-subjects` (a vector of subject strings) and `:deliver-policy` (`:all` | `:last` | `:new` | `:last-per-subject`); everything else — name, ack policy, recreation — is the library's business. The promise rejects pre-flight with a validation `:type :invalid-name` when `stream` is malformed and `:unknown-config-key` for an unrecognized opts key (ADR 0015), and operationally with `:stream-not-found` when no such Stream exists — both legs round-trip stream info at creation (ADR 0020).
(publish ctx subject data)(publish ctx subject data {:keys [headers] :as opts})Acked publish of data to subject through the JetStream context ctx, encoding
data with the context's default codec, returning a platform-native promise that
resolves to the normalized PubAck map {:stream :seq :duplicate :domain}. opts:
:headers (a map of case-sensitive string names to one or more string values, a
scalar normalized to a one-element vector); :msg-id (server-side dedup within the
stream's dedup window — the PubAck :duplicate is true on a retry); :expect
({:last-seq :last-msg-id :stream :last-subject-seq}, optimistic-concurrency
assertions whose mismatch rejects with an operational :type :wrong-last-sequence);
:timeout-ms (a missing PubAck rejects rather than hangs); and :codec (a per-call
codec override — only :data is codec'd). :msg-id/:expect are the sanctioned
way to set reserved Nats-* headers, so a reserved key set directly in :headers
rejects pre-flight with a validation :type :reserved-header, and a malformed
header with :invalid-header — both before any native call (ADR 0015/0020).
Acked publish of `data` to `subject` through the JetStream context `ctx`, encoding
`data` with the context's default codec, returning a platform-native promise that
resolves to the normalized PubAck map `{:stream :seq :duplicate :domain}`. `opts`:
`:headers` (a map of case-sensitive string names to one or more string values, a
scalar normalized to a one-element vector); `:msg-id` (server-side dedup within the
stream's dedup window — the PubAck `:duplicate` is true on a retry); `:expect`
(`{:last-seq :last-msg-id :stream :last-subject-seq}`, optimistic-concurrency
assertions whose mismatch rejects with an operational `:type :wrong-last-sequence`);
`:timeout-ms` (a missing PubAck rejects rather than hangs); and `:codec` (a per-call
codec override — only `:data` is codec'd). `:msg-id`/`:expect` are the sanctioned
way to set reserved `Nats-*` headers, so a reserved key set directly in `:headers`
rejects pre-flight with a validation `:type :reserved-header`, and a malformed
header with `:invalid-header` — both before any native call (ADR 0015/0020).(purge-stream ctx name)Purge every message from the Stream named name on the JetStream context ctx,
keeping the Stream definition itself, returning a platform-native promise that
resolves to {:purged <count>} — the number of messages the server dropped. The
promise rejects with an operational :type :stream-not-found when no such
Stream exists, and pre-flight with a validation :type :invalid-name when
name is malformed (ADR 0015/0020).
Purge every message from the Stream named `name` on the JetStream context `ctx`,
keeping the Stream definition itself, returning a platform-native promise that
resolves to `{:purged <count>}` — the number of messages the server dropped. The
promise rejects with an operational `:type :stream-not-found` when no such
Stream exists, and pre-flight with a validation `:type :invalid-name` when
`name` is malformed (ADR 0015/0020).(stream-info ctx name)Look up the Stream named name on the JetStream context ctx, returning a
platform-native promise that resolves to the normalized StreamInfo map (see
create-stream). The promise rejects with an operational :type :stream-not-found when no such Stream exists, and pre-flight with a validation
:type :invalid-name when name is malformed (ADR 0015/0020).
Look up the Stream named `name` on the JetStream context `ctx`, returning a platform-native promise that resolves to the normalized StreamInfo map (see `create-stream`). The promise rejects with an operational `:type :stream-not-found` when no such Stream exists, and pre-flight with a validation `:type :invalid-name` when `name` is malformed (ADR 0015/0020).
(stream-names ctx)Enumerate the Stream names on the server through the JetStream context ctx,
returning a platform-native promise that resolves to a vector of name strings.
Unlike consumer-names, this rides each leg's dedicated names endpoint rather
than deriving from list-streams, so it never pays for full infos.
Enumerate the Stream names on the server through the JetStream context `ctx`, returning a platform-native promise that resolves to a vector of name strings. Unlike `consumer-names`, this rides each leg's dedicated names endpoint rather than deriving from `list-streams`, so it never pays for full infos.
(term conn msg)Terminate the delivered JetStream message msg on conn: give up on it, so the
server never redelivers it — ack's terminal sibling for a message that was NOT
processed and never will be. Sugar over publish of the +TERM protocol payload
to the message's ack subject (ADR 0019), shaped exactly as ack: synchronous,
returns nil, idempotent, and throws :type :no-ack-subject on a message without
an ack subject.
Terminate the delivered JetStream message `msg` on `conn`: give up on it, so the server never redelivers it — `ack`'s terminal sibling for a message that was NOT processed and never will be. Sugar over publish of the `+TERM` protocol payload to the message's ack subject (ADR 0019), shaped exactly as `ack`: synchronous, returns nil, idempotent, and throws `:type :no-ack-subject` on a message without an ack subject.
(update-consumer ctx stream config)Update an existing durable Consumer's configuration on the Stream named stream
through the JetStream context ctx, from the portable, CLOSED kebab config map
(same keys as create-consumer, :name naming the durable to change), returning
a platform-native promise that resolves to the normalized ConsumerInfo map
carrying the new active config. The keys present are MERGED over the Consumer's
current config — an absent key keeps its current value rather than reverting to a
server default — so an ack wait or delivery cap can change without restating the
whole config (ADR 0020). Updates are deliberate and separate from create-consumer,
which stays create-only (ADR 0021), mirroring the create-stream/update-stream
split. The map is closed: an unrecognized key rejects pre-flight with a validation
:type :unknown-config-key, and a malformed :name or stream with
:invalid-name (ADR 0015). The promise rejects with an operational :type :consumer-not-found when no such Consumer exists, and :jetstream-api-error
carrying {:code :description} when the server rejects the change (e.g. an
immutable field) (ADR 0020).
Update an existing durable Consumer's configuration on the Stream named `stream`
through the JetStream context `ctx`, from the portable, CLOSED kebab `config` map
(same keys as `create-consumer`, `:name` naming the durable to change), returning
a platform-native promise that resolves to the normalized ConsumerInfo map
carrying the new active config. The keys present are MERGED over the Consumer's
current config — an absent key keeps its current value rather than reverting to a
server default — so an ack wait or delivery cap can change without restating the
whole config (ADR 0020). Updates are deliberate and separate from `create-consumer`,
which stays create-only (ADR 0021), mirroring the `create-stream`/`update-stream`
split. The map is closed: an unrecognized key rejects pre-flight with a validation
`:type :unknown-config-key`, and a malformed `:name` or `stream` with
`:invalid-name` (ADR 0015). The promise rejects with an operational `:type
:consumer-not-found` when no such Consumer exists, and `:jetstream-api-error`
carrying `{:code :description}` when the server rejects the change (e.g. an
immutable field) (ADR 0020).(update-stream ctx config)Update an existing Stream's configuration on the JetStream context ctx from the
portable, CLOSED kebab config map (same keys as create-stream, :name
naming the Stream to change), returning a platform-native promise that resolves
to the normalized StreamInfo map carrying the new active config. The keys
present are MERGED over the Stream's current config — an absent key keeps its
current value rather than reverting to a server default — so a retention or
limit can change without restating the whole config (ADR 0020). The map is
closed: an unrecognized key rejects pre-flight with a validation :type :unknown-config-key, and a malformed :name with :invalid-name (ADR 0015).
The promise rejects with an operational :type :stream-not-found when no such
Stream exists, and :jetstream-api-error carrying {:code :description} when
the server rejects the change (e.g. an immutable field) (ADR 0020).
Update an existing Stream's configuration on the JetStream context `ctx` from the
portable, CLOSED kebab `config` map (same keys as `create-stream`, `:name`
naming the Stream to change), returning a platform-native promise that resolves
to the normalized StreamInfo map carrying the new active config. The keys
present are MERGED over the Stream's current config — an absent key keeps its
current value rather than reverting to a server default — so a retention or
limit can change without restating the whole config (ADR 0020). The map is
closed: an unrecognized key rejects pre-flight with a validation `:type
:unknown-config-key`, and a malformed `:name` with `:invalid-name` (ADR 0015).
The promise rejects with an operational `:type :stream-not-found` when no such
Stream exists, and `:jetstream-api-error` carrying `{:code :description}` when
the server rejects the change (e.g. an immutable field) (ADR 0020).(working conn msg)Signal the delivered JetStream message msg is still being processed, on
conn, postponing the consumer's ack-wait timer so the server holds off
redelivering while work continues. Sugar over publish of the +WPI protocol
payload to the message's ack subject (ADR 0019), shaped exactly as ack:
synchronous, returns nil, and throws :type :no-ack-subject on a message
without an ack subject. Unlike its terminal siblings it is a REPEATABLE
progress signal — send it again whenever the deadline nears.
Signal the delivered JetStream message `msg` is still being processed, on `conn`, postponing the consumer's ack-wait timer so the server holds off redelivering while work continues. Sugar over publish of the `+WPI` protocol payload to the message's ack subject (ADR 0019), shaped exactly as `ack`: synchronous, returns nil, and throws `:type :no-ack-subject` on a message without an ack subject. Unlike its terminal siblings it is a REPEATABLE progress signal — send it again whenever the deadline nears.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |