Carmine-backed Clojure message queue, v2. All heavy lifting by Redis.
Uses an optimized message circle architecture that is simple, reliable, and has pretty good throughput and latency.
See mq-architecture.svg
in repo for diagram of architecture,
Ref. http://antirez.com/post/250 for initial inspiration.
Message status e/o: nil - Not in queue or already GC'd :queued - Awaiting handler :queued-with-backoff - Awaiting handler, but skip until backoff expired :locked - Currently with handler :locked-with-requeue - Currently with handler, will requeue when done :done-awaiting-gc - Finished handling, awaiting GC :done-with-backoff - Finished handling, awaiting GC, but skip until dedupe backoff expired :done-with-requeue - Will requeue, but skip until dedupe backoff expired
Redis keys (all prefixed with carmine:mq:<qname>:
):
messages - hash: {mid mcontent} ; Message content
messages-rq - hash: {mid mcontent} ; '' for requeues
lock-times - hash: {mid lock-ms} ; Optional mid-specific lock duration
lock-times-rq - hash: {mid lock-ms} ; '' for requeues
udts - hash: {mid udt-first-enqueued}
locks - hash: {mid lock-expiry-time} ; Active locks
backoffs - hash: {mid backoff-expiry-time} ; Active backoffs
nattempts - hash: {mid attempt-count}
done - mid set: awaiting gc, etc.
requeue - mid set: awaiting requeue ; Deprecated
mids-ready - list: mids for immediate handling (push to left, pop from right)
mid-circle - list: mids for maintenance processing (push to left, pop from right)
ndry-runs - int: num times worker(s) have lapped queue w/o work to do
isleep-a - list: 0/1 sentinel element for interruptible-sleep
isleep-b - list: 0/1 sentinel element for interruptible-sleep
Carmine-backed Clojure message queue, v2. All heavy lifting by Redis. Uses an optimized message circle architecture that is simple, reliable, and has pretty good throughput and latency. See `mq-architecture.svg` in repo for diagram of architecture, Ref. <http://antirez.com/post/250> for initial inspiration. Message status e/o: nil - Not in queue or already GC'd :queued - Awaiting handler :queued-with-backoff - Awaiting handler, but skip until backoff expired :locked - Currently with handler :locked-with-requeue - Currently with handler, will requeue when done :done-awaiting-gc - Finished handling, awaiting GC :done-with-backoff - Finished handling, awaiting GC, but skip until dedupe backoff expired :done-with-requeue - Will requeue, but skip until dedupe backoff expired Redis keys (all prefixed with `carmine:mq:<qname>:`): * messages - hash: {mid mcontent} ; Message content * messages-rq - hash: {mid mcontent} ; '' for requeues * lock-times - hash: {mid lock-ms} ; Optional mid-specific lock duration * lock-times-rq - hash: {mid lock-ms} ; '' for requeues * udts - hash: {mid udt-first-enqueued} * locks - hash: {mid lock-expiry-time} ; Active locks * backoffs - hash: {mid backoff-expiry-time} ; Active backoffs * nattempts - hash: {mid attempt-count} * done - mid set: awaiting gc, etc. * requeue - mid set: awaiting requeue ; Deprecated * mids-ready - list: mids for immediate handling (push to left, pop from right) * mid-circle - list: mids for maintenance processing (push to left, pop from right) * ndry-runs - int: num times worker(s) have lapped queue w/o work to do * isleep-a - list: 0/1 sentinel element for `interruptible-sleep` * isleep-b - list: 0/1 sentinel element for `interruptible-sleep`
(default-throttle-ms-fn queue-size)
Default/example (fn [queue-size]) -> ?throttle-msecs
Default/example (fn [queue-size]) -> ?throttle-msecs
(enqueue qname message)
(enqueue qname
message
{:keys [mid init-backoff-ms lock-ms can-update? can-requeue?
reset-init-backoff?]})
Pushes given message (any Clojure data type) to named queue and returns a map with keys: [success? mid action error].
When success?
is true: mid
, action
will be present, with
action
e/o #{:added :updated}.
When success?
is false: error
will be present, with
error
e/o #{:already-queued :locked :backoff}.
Options:
:init-backoff-ms
- Optional initial backoff in msecs.
:lock-ms
- Optional lock time in msecs. When unspecified, the
worker's default lock time will be used.
`:mid` - Optional unique message id (e.g. message hash) to
identify a specific message for dedupe/update/requeue.
When unspecified, a random uuid will be used.
`:can-update?` - When true, will update message content and/or lock-ms for
an mid still awaiting handling.
`:can-requeue?` - When true, will mark message with `:locked` or
`:done-with-backoff` status so that it will be
automatically requeued after garbage collection.
Pushes given message (any Clojure data type) to named queue and returns a map with keys: [success? mid action error]. When `success?` is true: `mid`, `action` will be present, with `action` e/o #{:added :updated}. When `success?` is false: `error` will be present, with `error` e/o #{:already-queued :locked :backoff}. Options: `:init-backoff-ms` - Optional initial backoff in msecs. `:lock-ms` - Optional lock time in msecs. When unspecified, the worker's default lock time will be used. `:mid` - Optional unique message id (e.g. message hash) to identify a specific message for dedupe/update/requeue. When unspecified, a random uuid will be used. `:can-update?` - When true, will update message content and/or lock-ms for an mid still awaiting handling. `:can-requeue?` - When true, will mark message with `:locked` or `:done-with-backoff` status so that it will be automatically requeued after garbage collection.
(exp-backoff n-attempt)
(exp-backoff n-attempt {:keys [min max factor] :or {factor 1000}})
Returns binary exponential backoff value for n<=36.
Returns binary exponential backoff value for n<=36.
Private, please don't use this.
Private, please don't use this.
(start this)
(stop this)
(message-status qname mid)
Returns current message status, e/o:
nil
- Not in queue or already GC'd
:queued
- Awaiting handler
:queued-with-backoff
- Awaiting handler, but skip until backoff expired
:locked
- Currently with handler
:locked-with-requeue
- Currently with handler, will requeue when done
:done-awaiting-gc
- Finished handling, awaiting GC
:done-with-backoff
- Finished handling, awaiting GC,
but skip until dedupe backoff expired
:done-with-requeue
- Will requeue, but skip until dedupe backoff expired
Returns current message status, e/o: `nil` - Not in queue or already GC'd `:queued` - Awaiting handler `:queued-with-backoff` - Awaiting handler, but skip until backoff expired `:locked` - Currently with handler `:locked-with-requeue` - Currently with handler, will requeue when done `:done-awaiting-gc` - Finished handling, awaiting GC `:done-with-backoff` - Finished handling, awaiting GC, but skip until dedupe backoff expired `:done-with-requeue` - Will requeue, but skip until dedupe backoff expired
(monitor-fn qname max-queue-size warn-backoff-ms)
Returns a worker monitor fn that warns when queue exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.
Returns a worker monitor fn that warns when queue exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.
(queue-content conn-opts qname)
Returns detailed {<mid> {:keys [message status ...]}} map for every message currently in queue.
O(n_mids) and expensive, avoid use in production.
Returns detailed {<mid> {:keys [message status ...]}} map for every message currently in queue. O(n_mids) and expensive, avoid use in production.
(queue-names conn-opts)
(queue-names conn-opts pattern)
Returns a non-empty set of existing queue names, or nil. Executes a Redis scan command, so O(n) of the number of keys in db.
Returns a non-empty set of existing queue names, or nil. Executes a Redis scan command, so O(n) of the number of keys in db.
(queue-size conn-opts qname)
Returns in O(1) the approx number of messages awaiting handler for given named queue. Same as (:nqueued (queue-status conn-opts qname)).
Returns in O(1) the approx number of messages awaiting handler for given named queue. Same as (:nqueued (queue-status conn-opts qname)).
(queue-status conn-opts qname)
Returns in O(1) the approx {:keys [nqueued nlocked nbackoff ntotal]} counts for given named queue.
nlocked
and nbackoff
may include expired entries!
Returns in O(1) the approx {:keys [nqueued nlocked nbackoff ntotal]} counts for given named queue. `nlocked` and `nbackoff` may include expired entries!
(queues-clear!! conn-opts qnames)
Permanently deletes ALL content for the Carmine message queues with given names.
Returns nil, or a non-empty vector of the queue names that were cleared.
Permanently deletes ALL content for the Carmine message queues with given names. Returns nil, or a non-empty vector of the queue names that were cleared.
(queues-clear-all!!! conn-opts)
DANGER! Permanently deletes ALL content for ALL Carmine message queues. Returns nil, or a non-empty vector of the queue names that were cleared.
**DANGER**! Permanently deletes ALL content for *ALL* Carmine message queues. Returns nil, or a non-empty vector of the queue names that were cleared.
(set-min-log-level! level)
Sets Timbre's minimum log level for internal Carmine message queue namespaces.
Possible levels: #{:trace :debug :info :warn :error :fatal :report}.
Default level: :info
.
Sets Timbre's minimum log level for internal Carmine message queue namespaces. Possible levels: #{:trace :debug :info :warn :error :fatal :report}. Default level: `:info`.
(worker conn-opts qname)
(worker conn-opts
qname
{:keys [handler monitor lock-ms eoq-backoff-ms throttle-ms auto-start
nthreads-worker nthreads-handler]
:as worker-opts
:or {handler (fn [m] (timbre/info m) {:status :success})
monitor (monitor-fn qname 1000 (enc/ms :hours 6))
lock-ms (enc/ms :mins 60)
nthreads-worker 1
nthreads-handler 1
throttle-ms default-throttle-ms-fn
eoq-backoff-ms exp-backoff
auto-start true}})
Returns a stateful threaded CarmineMessageQueueWorker
to handle messages
added to named queue with enqueue
.
API:
queue-size
for given qname.queue-status
for given qname.queue-content
for given qname.Debugging:
To debug a worker, deref it to see detailed status map with keys:
:qname
- Worker's queue name (string)
:opts
- Worker's options map
:conn-opts
- Worker's connection options map
:running?
- Is the worker currently running? (true/false)
:nthreads
- {:keys [worker handler]}
:stats
:queue-size
- {:keys [last min max mean mad var p50 p90 ...]}
:queueing-time-ms
- {:keys [last min max mean mad var p50 p90 ...]}
:handling-time-ns
- {:keys [last min max mean mad var p50 p90 ...]}
:counts
:handler/success
- Number of handler calls with :success
status
:handler/error
- Number of handler calls with :error
status
:handler/retry
- Number of handler calls with :retry
status
:handler/backoff
- Number of handler calls encountering an mid in backoff
...
See also the :monitor
option below, and utils:
queue-names
, queue-size
, queue-content
, message-status
, etc.
Options:
:handler
(fn [{:keys [qname mid message attempt]}]) called for each worker message.
Should return a map with possible keys:
:status
- ∈ {:success :error :retry}
:throwable
- Optional Throwable when relevant
:backoff-ms
- Optional time (in milliseconds) to backoff
for dedupe or before retrying
If handler throws ANY `Throwable`, will assume `:error` status and NOT retry.
For custom error handling, make sure to use an appropriate try/catch
within your handler fn!
:monitor
- (fn [{:keys [queue-size ndry-runs poll-reply]}])
Called on each worker loop iteration. Useful for queue monitoring/logging.
See also monitor-fn
.
:lock-ms
(default 60 minutes)
Default time (in milliseconds) that handler may keep a message before handler
is considered fatally stalled and message is re-queued. Must be large enough
to prevent double handling!
Can be overridden on a per-message basis via `enqueue`.
:throttle-ms
(default default-throttle-ms-fn
)
Thread sleep period (in milliseconds) between each poll.
Can be a (fn [queue-size]) -> ?sleep-msecs.
:eoq-backoff-ms
(default exp-backoff
)
Max time (in milliseconds) to sleep thread each time end of queue is reached.
Can be a (fn [ndry-runs]) -> msecs for n<=5.
Sleep may be interrupted when new messages are enqueued.
If present, connection read timeout should be >= max msecs!
:nthreads-worker
- Number of threads to monitor and maintain queue.
:nthreads-handler
- Number of threads to handle queue messages with handler fn.
Returns a stateful threaded `CarmineMessageQueueWorker` to handle messages added to named queue with `enqueue`. API: - (deref <worker>) => Detailed worker status map (see "Debugging" below). - (<worker> :start) => Same as calling (start <worker>). - (<worker> :stop) => Same as calling (stop <worker>). - (<worker> :queue-size) => Same as calling `queue-size` for given qname. - (<worker> :queue-status) => Same as calling `queue-status` for given qname. - (<worker> :queue-content) => Same as calling `queue-content` for given qname. Debugging: To debug a worker, deref it to see detailed status map with keys: `:qname` - Worker's queue name (string) `:opts` - Worker's options map `:conn-opts` - Worker's connection options map `:running?` - Is the worker currently running? (true/false) `:nthreads` - {:keys [worker handler]} `:stats` `:queue-size` - {:keys [last min max mean mad var p50 p90 ...]} `:queueing-time-ms` - {:keys [last min max mean mad var p50 p90 ...]} `:handling-time-ns` - {:keys [last min max mean mad var p50 p90 ...]} `:counts` `:handler/success` - Number of handler calls with `:success` status `:handler/error` - Number of handler calls with `:error` status `:handler/retry` - Number of handler calls with `:retry` status `:handler/backoff` - Number of handler calls encountering an mid in backoff ... See also the `:monitor` option below, and utils: `queue-names`, `queue-size`, `queue-content`, `message-status`, etc. Options: `:handler` (fn [{:keys [qname mid message attempt]}]) called for each worker message. Should return a map with possible keys: `:status` - ∈ {:success :error :retry} `:throwable` - Optional Throwable when relevant `:backoff-ms` - Optional time (in milliseconds) to backoff for dedupe or before retrying If handler throws ANY `Throwable`, will assume `:error` status and NOT retry. For custom error handling, make sure to use an appropriate try/catch within your handler fn! `:monitor` - (fn [{:keys [queue-size ndry-runs poll-reply]}]) Called on each worker loop iteration. Useful for queue monitoring/logging. See also `monitor-fn`. `:lock-ms` (default 60 minutes) Default time (in milliseconds) that handler may keep a message before handler is considered fatally stalled and message is re-queued. Must be large enough to prevent double handling! Can be overridden on a per-message basis via `enqueue`. `:throttle-ms` (default `default-throttle-ms-fn`) Thread sleep period (in milliseconds) between each poll. Can be a (fn [queue-size]) -> ?sleep-msecs. `:eoq-backoff-ms` (default `exp-backoff`) Max time (in milliseconds) to sleep thread each time end of queue is reached. Can be a (fn [ndry-runs]) -> msecs for n<=5. Sleep may be interrupted when new messages are enqueued. If present, connection read timeout should be >= max msecs! `:nthreads-worker` - Number of threads to monitor and maintain queue. `:nthreads-handler` - Number of threads to handle queue messages with handler fn.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close