Carmine-backed Clojure message queue, v2. All heavy lifting by Redis.
Uses an optimized message circle architecture that is simple, reliable, and has pretty good throughput and latency.
See mq-diagram.svg in repo for diagram of architecture,
Ref. http://antirez.com/post/250 for initial inspiration.
Message status e/o: nil - Not in queue or already GC'd :queued - Awaiting handler :queued-with-backoff - Awaiting handler, but skip until backoff expired :locked - Currently with handler :locked-with-requeue - Currently with handler, will requeue when done :done-awaiting-gc - Finished handling, awaiting GC :done-with-backoff - Finished handling, awaiting GC, but skip until dedupe backoff expired :done-with-requeue - Will requeue, but skip until dedupe backoff expired
Redis keys (all prefixed with carmine:mq:<qname>:):
messages - hash: {mid mcontent} ; Message content
messages-rq - hash: {mid mcontent} ; '' for requeues
lock-times - hash: {mid lock-ms} ; Optional mid-specific lock duration
lock-times-rq - hash: {mid lock-ms} ; '' for requeues
udts - hash: {mid udt-first-enqueued}
locks - hash: {mid lock-expiry-time} ; Active locks
backoffs - hash: {mid backoff-expiry-time} ; Active backoffs
nattempts - hash: {mid attempt-count}
done - mid set: awaiting gc, etc.
requeue - mid set: awaiting requeue ; Deprecated
mids-ready - list: mids for immediate handling (push to left, pop from right)
mid-circle - list: mids for maintenance processing (push to left, pop from right)
ndry-runs - int: num times worker(s) have lapped queue w/o work to do
isleep-a - list: 0/1 sentinel element for interruptible-sleep
isleep-b - list: 0/1 sentinel element for interruptible-sleep
Carmine-backed Clojure message queue, v2.
All heavy lifting by Redis.
Uses an optimized message circle architecture that is simple, reliable,
and has pretty good throughput and latency.
See `mq-diagram.svg` in repo for diagram of architecture,
Ref. http://antirez.com/post/250 for initial inspiration.
Message status e/o:
nil - Not in queue or already GC'd
:queued - Awaiting handler
:queued-with-backoff - Awaiting handler, but skip until backoff expired
:locked - Currently with handler
:locked-with-requeue - Currently with handler, will requeue when done
:done-awaiting-gc - Finished handling, awaiting GC
:done-with-backoff - Finished handling, awaiting GC,
but skip until dedupe backoff expired
:done-with-requeue - Will requeue, but skip until dedupe backoff expired
Redis keys (all prefixed with `carmine:mq:<qname>:`):
* messages - hash: {mid mcontent} ; Message content
* messages-rq - hash: {mid mcontent} ; '' for requeues
* lock-times - hash: {mid lock-ms} ; Optional mid-specific lock duration
* lock-times-rq - hash: {mid lock-ms} ; '' for requeues
* udts - hash: {mid udt-first-enqueued}
* locks - hash: {mid lock-expiry-time} ; Active locks
* backoffs - hash: {mid backoff-expiry-time} ; Active backoffs
* nattempts - hash: {mid attempt-count}
* done - mid set: awaiting gc, etc.
* requeue - mid set: awaiting requeue ; Deprecated
* mids-ready - list: mids for immediate handling (push to left, pop from right)
* mid-circle - list: mids for maintenance processing (push to left, pop from right)
* ndry-runs - int: num times worker(s) have lapped queue w/o work to do
* isleep-a - list: 0/1 sentinel element for `interruptible-sleep`
* isleep-b - list: 0/1 sentinel element for `interruptible-sleep`(clear-queues conn-opts qnames)Permanently deletes ALL content for the Carmine message queues with given names.
Returns nil, or a non-empty vector of the queue names that were cleared.
Permanently deletes ALL content for the Carmine message queues with given names. Returns nil, or a non-empty vector of the queue names that were cleared.
(default-throttle-ms-fn queue-size)Default/example (fn [queue-size]) -> ?throttle-msecs
Default/example (fn [queue-size]) -> ?throttle-msecs
(enqueue qname message)(enqueue qname
message
{:keys [init-backoff-ms lock-ms mid can-update? can-requeue?]})Pushes given message (any Clojure data type) to named queue and returns a map with keys: [success? mid action error].
When success? is true: mid, action will be present, with
action e/o #{:added :updated}.
When success? is false: error will be present, with
error e/o #{:already-queued :locked :backoff}.
Options: :init-backoff-ms - Optional initial backoff in msecs. :lock-ms - Optional lock time in msecs. When unspecified, the worker's default lock time will be used.
:mid - Optional unique message id (e.g. message hash) to
identify a specific message for dedupe/update/requeue.
When unspecified, a random uuid will be used.
:can-update? - When true, will update message content and/or lock-ms for
an mid still awaiting handling.
:can-requeue? - When true, will mark message with `:locked` or
`:done-with-backoff` status so that it will be
automatically requeued after garbage collection.
Pushes given message (any Clojure data type) to named queue and returns
a map with keys: [success? mid action error].
When `success?` is true: `mid`, `action` will be present, with
`action` e/o #{:added :updated}.
When `success?` is false: `error` will be present, with
`error` e/o #{:already-queued :locked :backoff}.
Options:
:init-backoff-ms - Optional initial backoff in msecs.
:lock-ms - Optional lock time in msecs. When unspecified, the
worker's default lock time will be used.
:mid - Optional unique message id (e.g. message hash) to
identify a specific message for dedupe/update/requeue.
When unspecified, a random uuid will be used.
:can-update? - When true, will update message content and/or lock-ms for
an mid still awaiting handling.
:can-requeue? - When true, will mark message with `:locked` or
`:done-with-backoff` status so that it will be
automatically requeued after garbage collection.(exp-backoff n-attempt)(exp-backoff n-attempt {:keys [min max factor] :or {factor 1000}})Returns binary exponential backoff value for n<=36.
Returns binary exponential backoff value for n<=36.
Implementation detail.
Implementation detail.
(start this)(stop this)(make-dequeue-worker pool
spec
&
{:keys [handler-fn handler-ttl-msecs backoff-msecs
throttle-msecs auto-start?]})DEPRECATED: Use worker instead.
DEPRECATED: Use `worker` instead.
(message-status qname mid)Returns current message status, e/o: nil - Not in queue or already GC'd :queued - Awaiting handler :queued-with-backoff - Awaiting handler, but skip until backoff expired :locked - Currently with handler :locked-with-requeue - Currently with handler, will requeue when done :done-awaiting-gc - Finished handling, awaiting GC :done-with-backoff - Finished handling, awaiting GC, but skip until dedupe backoff expired :done-with-requeue - Will requeue, but skip until dedupe backoff expired
Returns current message status, e/o:
nil - Not in queue or already GC'd
:queued - Awaiting handler
:queued-with-backoff - Awaiting handler, but skip until backoff expired
:locked - Currently with handler
:locked-with-requeue - Currently with handler, will requeue when done
:done-awaiting-gc - Finished handling, awaiting GC
:done-with-backoff - Finished handling, awaiting GC,
but skip until dedupe backoff expired
:done-with-requeue - Will requeue, but skip until dedupe backoff expired(monitor-fn qname max-queue-size warn-backoff-ms)Returns a worker monitor fn that warns when queue exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.
Returns a worker monitor fn that warns when queue exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.
(queue-content conn-opts qname)Returns detailed {<mid> {:keys [message status ...]}} map for every message currently in queue.
O(n_mids) and expensive, avoid use in production.
Returns detailed {<mid> {:keys [message status ...]}} map for every
message currently in queue.
O(n_mids) and expensive, avoid use in production.(queue-names conn-opts)(queue-names conn-opts pattern)Returns a non-empty set of existing queue names, or nil.
Returns a non-empty set of existing queue names, or nil.
(queue-size conn-opts qname)Returns in O(1) the approx number of messages awaiting handler for given named queue. Same as (:nqueued (queue-status conn-opts qname)).
Returns in O(1) the approx number of messages awaiting handler for given named queue. Same as (:nqueued (queue-status conn-opts qname)).
(queue-status conn-opts qname)Returns in O(1) the approx {:keys [nqueued nlocked nbackoff ntotal]} counts for given named queue.
nlocked and nbackoff may include expired entries!
Returns in O(1) the approx {:keys [nqueued nlocked nbackoff ntotal]}
counts for given named queue.
`nlocked` and `nbackoff` may include expired entries!(queues-clear!! conn-opts qnames)Permanently deletes ALL content for the Carmine message queues with given names.
Returns nil, or a non-empty vector of the queue names that were cleared.
Permanently deletes ALL content for the Carmine message queues with given names. Returns nil, or a non-empty vector of the queue names that were cleared.
(queues-clear-all!!! conn-opts)DANGER! Permanently deletes ALL content for ALL Carmine message queues. Returns nil, or a non-empty vector of the queue names that were cleared.
**DANGER**! Permanently deletes ALL content for *ALL* Carmine message queues. Returns nil, or a non-empty vector of the queue names that were cleared.
(set-min-log-level! level)Sets Timbre's minimum log level for internal Carmine message queue namespaces.
Possible levels: #{:trace :debug :info :warn :error :fatal :report}.
Default level: :info.
Sets Timbre's minimum log level for internal Carmine message queue namespaces.
Possible levels: #{:trace :debug :info :warn :error :fatal :report}.
Default level: `:info`.(worker conn-opts qname)(worker conn-opts
qname
{:keys [handler monitor lock-ms eoq-backoff-ms throttle-ms auto-start
nthreads-worker nthreads-handler]
:as worker-opts
:or {handler (fn [m] (timbre/info m) {:status :success})
monitor (monitor-fn qname 1000 (enc/ms :hours 6))
lock-ms (enc/ms :mins 60)
nthreads-worker 1
nthreads-handler 1
throttle-ms :auto
eoq-backoff-ms exp-backoff
auto-start true}})Returns a stateful threaded CarmineMessageQueueWorker to handle messages
added to named queue with enqueue.
API:
queue-size for given qname.queue-status for given qname.queue-content for given qname.Options:
:handler - (fn [{:keys [qname mid message attempt]}]) that throws
or returns {:status <#{:success :error :retry}>
:throwable <Throwable>
:backoff-ms <retry-or-dedupe-backoff-ms}.
:monitor - (fn [{:keys [queue-size ndry-runs poll-reply]}])
called on each worker loop iteration. Useful for queue
monitoring/logging. See also monitor-fn.
:lock-ms - Default time that handler may keep a message before handler
considered fatally stalled and message is re-queued. Must be
sufficiently high to prevent double handling. Can be
overridden on a per-message basis via enqueue.
:throttle-ms - Thread sleep period between each poll.
Can be a (fn [queue-size]) -> ?sleep-msecs,
or :auto (to use default-throttle-ms-fn).
:eoq-backoff-ms - Max msecs to sleep thread each time end of queue is reached. Can be a (fn [ndry-runs]) -> msecs for n<=5. Sleep may be interrupted when new messages are enqueued. If present, connection read timeout should be >= max msecs.
:nthreads-worker - Number of threads to monitor and maintain queue. :nthreads-handler - Number of threads to handle queue messages with handler fn.
Returns a stateful threaded CarmineMessageQueueWorker to handle messages
added to named queue with `enqueue`.
API:
- (deref <worker>) => Status map, {:keys [running? nthreads stats ...]}.
- (<worker> :start) => Same as calling (start <worker>).
- (<worker> :stop) => Same as calling (stop <worker>).
- (<worker> :queue-size) => Same as calling `queue-size` for given qname.
- (<worker> :queue-status) => Same as calling `queue-status` for given qname.
- (<worker> :queue-content) => Same as calling `queue-content` for given qname.
Options:
:handler - (fn [{:keys [qname mid message attempt]}]) that throws
or returns {:status <#{:success :error :retry}>
:throwable <Throwable>
:backoff-ms <retry-or-dedupe-backoff-ms}.
:monitor - (fn [{:keys [queue-size ndry-runs poll-reply]}])
called on each worker loop iteration. Useful for queue
monitoring/logging. See also `monitor-fn`.
:lock-ms - Default time that handler may keep a message before handler
considered fatally stalled and message is re-queued. Must be
sufficiently high to prevent double handling. Can be
overridden on a per-message basis via `enqueue`.
:throttle-ms - Thread sleep period between each poll.
Can be a (fn [queue-size]) -> ?sleep-msecs,
or :auto (to use `default-throttle-ms-fn`).
:eoq-backoff-ms - Max msecs to sleep thread each time end of queue is reached.
Can be a (fn [ndry-runs]) -> msecs for n<=5.
Sleep may be interrupted when new messages are enqueued.
If present, connection read timeout should be >= max msecs.
:nthreads-worker - Number of threads to monitor and maintain queue.
:nthreads-handler - Number of threads to handle queue messages with handler fn.cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |