A .cljc handler runs on two very different execution models — jnats delivers on dispatcher threads, nats.js delivers by looping an async iterable on the single-threaded event loop. To keep write-once-run-both honest, the delivery contract is the strongest guarantee true on both:
:slow-consumer.Overflow surfaces as a :slow-consumer Error delivered to that subscription's :on-error (never the connection-level :on-status — it is inherently per-subscription; ADR 0006) when undelivered messages for the subscription cross an optional :max-pending threshold, and is silently dropped if the subscription set no :on-error. What :max-pending guarantees portably is the signal, not a hard heap cap: on the JVM jnats additionally drops over-limit messages from its dispatcher queue (and is bounded by default at 512K msgs / 64 MB even with :max-pending absent); on ClojureScript nats-core's buffer is unbounded and :max-pending only sets the notification threshold (slow?) — it does not auto-drop, so the consumer reacts to :slow-consumer (drain, unsubscribe, or slow the source). This is an accepted "shape, not cadence" divergence (ADR 0006): the signal is portable, the drop is native.
Because the signal is the whole point of :max-pending, the native slow-consumer wiring engages only when both :max-pending and :on-error are set, on both legs. With no :on-error there is nothing to signal, so neither the custom threshold nor (on the JVM) the lowered drop point is applied — each client keeps its own defaults (JVM bounded at jnats' 512K/64 MB, ClojureScript unbounded), rather than the JVM silently dropping harder than default with no notification. :on-error still receives a throwing handler or a decode failure regardless of :max-pending; only the :slow-consumer path depends on the pair.
The contract above (serial, ordered-per-publisher, promise-return backpressure, :slow-consumer at :max-pending) is realized by driving each client's own consumption model rather than by an our-layer queue — even though the native way looks like an anti-pattern on the JVM.
onMessage blocks the dispatcher thread on the handler's CompletionStage (no timeout). Serial delivery and one-at-a-time ordering fall out for free, and — crucially — backpressure builds in jnats' own dispatcher queue, so its native pending-limits (setPendingLimits, 512K/64 MB default) and ErrorListener.slowConsumerDetected actually engage. slowConsumerDetected fires connection-level with only the native Consumer, so a {dispatcher → :on-error} registry on the connection routes it to the originating subscription.{:callback}), with a detached loop that awaits the handler before pulling the next message. The buffering then lives in nats-core's QueuedIterator, where the iterator-only slow? threshold + setSlowNotificationFn apply (both throw under the callback model).What is superseded, not reverted is the mechanism, not the contract. The promise-return backpressure described above is unchanged: a handler that returns a promise still suspends delivery of the next message until that promise settles. What changed is how that contract is realized — the earlier manual tail-chaining (each delivery chained onto the prior handler's settle) is replaced by driving each client's native consumption model. The tail-chain was correct for ordering/backpressure but kept both native queues empty, so neither client's slow-consumer detection could ever trip. Accepted costs: the JVM pins one dispatcher thread per in-flight slow handler; CLJS must wire the detached loop's teardown into drain/close/unsubscribe (handled by letting the iterable complete).
A Service endpoint handler is an ordinary handler under this same contract, so endpoint delivery is realized the same way — and the CLJS leg must take the iterator fork, not the callback one. jnats' ServiceMessageHandler.onMessage blocks the dispatcher on the handler's CompletionStage, so serial-per-endpoint delivery and promise-return backpressure fall out for free, exactly as a core subscription does. On nats.js the endpoint can be either a callback (addEndpoint(name, {handler})) or an iterator (addEndpoint(name, {}), which returns a QueuedIterator<ServiceMsg>); @nats-io/services 3.4.0 invokes the callback synchronously and does not await its returned promise — verified, not inferred: with the callback wiring a slow async handler did not delay the next request to the same endpoint (entry gap ≈ 0) and the endpoint's processing_time stayed 0 (the callback path stops its latency timer the instant the synchronous callback returns). So the impl omits the handler and drives the endpoint's iterator with a detached await-the-handler-then-.next loop (the same shape as a core subscription's), which puts the awaited handler duration inside the iterator's per-iteration profile timer — the source of an iterator endpoint's processing_time. Of the two ways to reconcile that synchronous callback with this contract — drive the iterator, or narrow the Handler contract on the CLJS leg — the recorded resolution is drive the iterator: the portable Handler contract is unchanged, no narrowing.
One accepted consequence: on the iterator path nats.js 3.4.0 never calls countError, so the endpoint's num_errors is the impl's to maintain. A thrown or rejecting handler is counted on the same .catch funnel that auto-replies the 500 (ADR 0025), by locating the endpoint's native stats object off the Service's handler entry — confined to the services impl ns and degrading to no count (never a throw) if nats.js' internal shape ever changes.
:max-pending/:slow-consumer would be a parallel re-implementation of what each client already does (and jnats' native drop could never be honored). Road 2 reuses the native machinery and deletes the tail-chaining.Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |