Clojure wrapper over Netflix concurrency-limits.
Core concepts:
limiter controls inflight concurrency using an adaptive algorithm.acquire! attempts to get a token; returns a listener map on success, nil on rejection.success!, ignore!, or dropped!.with-limit handles the acquire/finalize lifecycle automatically.Clojure wrapper over Netflix concurrency-limits. Core concepts: - A `limiter` controls inflight concurrency using an adaptive algorithm. - Calling `acquire!` attempts to get a token; returns a listener map on success, nil on rejection. - The listener must be finalized with `success!`, `ignore!`, or `dropped!`. - `with-limit` handles the acquire/finalize lifecycle automatically.
(acquire! limiter)(acquire! limiter context)Attempts to acquire a concurrency token from limiter.
Returns a Limiter$Listener on success, or nil if the limit is exceeded.
You MUST call one of success!, ignore!, or dropped! on the
returned listener when the work completes.
Attempts to acquire a concurrency token from `limiter`. Returns a `Limiter$Listener` on success, or nil if the limit is exceeded. You MUST call one of `success!`, `ignore!`, or `dropped!` on the returned listener when the work completes.
(aimd-limit {:keys [initial-limit min-limit max-limit backoff-ratio
timeout-ns]})Creates an AIMDLimit - additive increase / multiplicative decrease. Good for client-side limiting or when drop events are the signal.
Options: :initial-limit (int, default 20) :min-limit (int, default 20) :max-limit (int, default 200) :backoff-ratio (double 0.5-1.0, default 0.9) :timeout-ns (long nanoseconds)
Creates an AIMDLimit - additive increase / multiplicative decrease. Good for client-side limiting or when drop events are the signal. Options: :initial-limit (int, default 20) :min-limit (int, default 20) :max-limit (int, default 200) :backoff-ratio (double 0.5-1.0, default 0.9) :timeout-ns (long nanoseconds)
(attempt! limiter
f
&
{:keys [context on-reject classify]
:or {classify (constantly :success)}})Acquires a token from limiter and calls f.
Options: :context Passed to the limiter's acquire; defaults to nil.
:on-reject (fn [] -> any) called when the limit is exceeded. Defaults to throwing an ex-info with :type ::s-exp.flux/limit-exceeded.
:classify (fn [result] -> :success | :ignore | :dropped)
Maps the return value of f to a limiter signal.
Defaults to always :success.
Returns the return value of f (or on-reject) on success, or throws if
rejected and no on-reject is provided.
Acquires a token from `limiter` and calls `f`.
Options:
:context Passed to the limiter's acquire; defaults to nil.
:on-reject (fn [] -> any) called when the limit is exceeded.
Defaults to throwing an ex-info with :type ::s-exp.flux/limit-exceeded.
:classify (fn [result] -> :success | :ignore | :dropped)
Maps the return value of `f` to a limiter signal.
Defaults to always :success.
Returns the return value of `f` (or `on-reject`) on success, or throws if
rejected and no `on-reject` is provided.(blocking-limiter limiter & {:keys [timeout-ms]})Wraps any limiter to block the calling thread when the limit is reached, rather than rejecting immediately.
The thread waits until a slot becomes available or the timeout expires.
On timeout or interrupt, acquire! returns nil.
Options: :timeout-ms (long) maximum time to block in milliseconds. Defaults to 1 hour. Must be less than 1 hour.
Wraps any limiter to block the calling thread when the limit is reached,
rather than rejecting immediately.
The thread waits until a slot becomes available or the timeout expires.
On timeout or interrupt, `acquire!` returns nil.
Options:
:timeout-ms (long) maximum time to block in milliseconds.
Defaults to 1 hour. Must be less than 1 hour.(dropped! listener)Signal that the request was dropped externally (timeout, upstream rejection). Loss-based algorithms will typically respond with a limit reduction.
Signal that the request was dropped externally (timeout, upstream rejection). Loss-based algorithms will typically respond with a limit reduction.
(fixed-limit {:keys [limit]})Creates a FixedLimit - non-adaptive, static concurrency cap.
Options: :limit (int, required)
Creates a FixedLimit - non-adaptive, static concurrency cap. Options: :limit (int, required)
(gradient2-limit {:keys [initial-limit min-limit max-concurrency smoothing
rtt-tolerance long-window queue-size]})Creates a Gradient2Limit - tracks divergence between exponential averages.
Options: :initial-limit (int, default 20) :min-limit (int, default 20) :max-concurrency (int, default 200) :smoothing (double 0.0-1.0, default 0.2) :rtt-tolerance (double >= 1.0, default 1.5) :long-window (int ms, default 600) :queue-size (int)
Creates a Gradient2Limit - tracks divergence between exponential averages. Options: :initial-limit (int, default 20) :min-limit (int, default 20) :max-concurrency (int, default 200) :smoothing (double 0.0-1.0, default 0.2) :rtt-tolerance (double >= 1.0, default 1.5) :long-window (int ms, default 600) :queue-size (int)
(ignore! listener)Signal that the operation failed before producing meaningful timing data (e.g. validation error, auth failure). The RTT sample is discarded so it does not skew the algorithm.
Signal that the operation failed before producing meaningful timing data (e.g. validation error, auth failure). The RTT sample is discarded so it does not skew the algorithm.
(lifo-blocking-limiter limiter
&
{:keys [backlog-size backlog-timeout-ms
backlog-timeout-fn]})Wraps any limiter with LIFO (last-in, first-out) blocking semantics.
When the limit is reached, incoming threads are queued. The most recently queued thread is unblocked first, which favours availability over latency: the oldest waiting requests shed first, keeping the queue fresh.
Options: :backlog-size (int) maximum number of threads that may block waiting; excess requests are rejected immediately. Default: 100. :backlog-timeout-ms (long) fixed timeout in milliseconds for queued threads. Default: 1000 ms. :backlog-timeout-fn (fn [context] -> long ms) derives the timeout dynamically from the acquire context. When provided, takes precedence over :backlog-timeout-ms.
Wraps any limiter with LIFO (last-in, first-out) blocking semantics.
When the limit is reached, incoming threads are queued. The most recently
queued thread is unblocked first, which favours availability over latency:
the oldest waiting requests shed first, keeping the queue fresh.
Options:
:backlog-size (int) maximum number of threads that may block waiting;
excess requests are rejected immediately. Default: 100.
:backlog-timeout-ms (long) fixed timeout in milliseconds for queued threads.
Default: 1000 ms.
:backlog-timeout-fn (fn [context] -> long ms) derives the timeout
dynamically from the acquire context. When provided,
takes precedence over :backlog-timeout-ms.(partitioned-limiter limiter partition-by partitions)Wraps an AbstractLimiter to enforce per-partition admission control.
The total adaptive limit is divided among named partitions according to fixed ratios. Each partition's slot budget is:
floor(current-total-limit × partition-ratio)
Requests that resolve to a known partition are admitted only when that partition's inflight count is below its budget. Requests that resolve to nil (or an unknown partition key) are admitted only when there is spare capacity not consumed by any partition (i.e. unpartitioned overflow).
The underlying limiter still enforces the global total; partitioning adds a per-partition admission gate on top.
Arguments: limiter - an AbstractLimiter instance (e.g. from simple-limiter) partition-by - (fn [context] -> partition-key | nil) Called with the context passed to acquire!. Return value is looked up in partitions. partitions - map of partition-key -> ratio (0.0–1.0). Ratios should sum to ≤ 1.0.
Example: (partitioned-limiter (simple-limiter (vegas-limit {:max-concurrency 100})) (fn [ctx] (get-in ctx [:headers "x-tier"])) {"live" 0.8 "batch" 0.1})
Wraps an AbstractLimiter to enforce per-partition admission control.
The total adaptive limit is divided among named partitions according to
fixed ratios. Each partition's slot budget is:
floor(current-total-limit × partition-ratio)
Requests that resolve to a known partition are admitted only when that
partition's inflight count is below its budget. Requests that resolve to
nil (or an unknown partition key) are admitted only when there is spare
capacity not consumed by any partition (i.e. unpartitioned overflow).
The underlying limiter still enforces the global total; partitioning adds
a per-partition admission gate on top.
Arguments:
limiter - an AbstractLimiter instance (e.g. from simple-limiter)
partition-by - (fn [context] -> partition-key | nil)
Called with the context passed to acquire!.
Return value is looked up in partitions.
partitions - map of partition-key -> ratio (0.0–1.0).
Ratios should sum to ≤ 1.0.
Example:
(partitioned-limiter
(simple-limiter (vegas-limit {:max-concurrency 100}))
(fn [ctx] (get-in ctx [:headers "x-tier"]))
{"live" 0.8
"batch" 0.1})(simple-limiter limit & {:keys [name]})Creates a SimpleLimiter wrapping a limit algorithm.
limit is a limit instance (vegas-limit, gradient2-limit, aimd-limit, fixed-limit).
Options: :name (string) name for metrics
Creates a SimpleLimiter wrapping a limit algorithm. `limit` is a limit instance (vegas-limit, gradient2-limit, aimd-limit, fixed-limit). Options: :name (string) name for metrics
(success! listener)Signal that the guarded operation completed successfully. The measured latency will be used to tune the limit algorithm.
Signal that the guarded operation completed successfully. The measured latency will be used to tune the limit algorithm.
(vegas-limit {:keys [initial-limit max-concurrency smoothing probe-multiplier]})Creates a VegasLimit - a delay-based adaptive algorithm.
Options: :initial-limit (int, default 20) :max-concurrency (int, default 1000) :smoothing (double 0.0-1.0, default 1.0) :probe-multiplier (int, default 30)
Creates a VegasLimit - a delay-based adaptive algorithm. Options: :initial-limit (int, default 20) :max-concurrency (int, default 1000) :smoothing (double 0.0-1.0, default 1.0) :probe-multiplier (int, default 30)
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |