Carmine-backed Clojure message queue. All heavy lifting by Redis. Message circle architecture used here is simple, reliable, and has reasonable throughput but at best mediocre latency.
Redis keys:
allow-requeue?
option.Ref. http://antirez.com/post/250 for basic implementation details
Carmine-backed Clojure message queue. All heavy lifting by Redis. Message circle architecture used here is simple, reliable, and has reasonable throughput but at best mediocre latency. Redis keys: * carmine:mq:<qname>:messages - hash, {mid mcontent}. * carmine:mq:<qname>:locks - hash, {mid lock-expiry-time}. * carmine:mq:<qname>:backoffs - hash, {mid backoff-expiry-time}. * carmine:mq:<qname>:nattempts - hash, {mid attempt-count}. * carmine:mq:<qname>:mid-circle - list, rotating list of mids. * carmine:mq:<qname>:done - set, awaiting gc, requeue, etc. * carmine:mq:<qname>:requeue - set, for `allow-requeue?` option. * carmine:mq:<qname>:eoq-backoff? - ttl flag, used for queue-wide (every-worker) polling backoff. * carmine:mq:<qname>:ndry-runs - int, number of times worker(s) have burnt through queue w/o work to do. Ref. http://antirez.com/post/250 for basic implementation details
IMPLEMENTATION DETAIL: Use worker
instead.
Rotates queue's mid-circle and processes next mid. Returns:
nil - If msg GC'd, locked, or set to backoff.
"eoq-backoff" - If circle uninitialized or end-of-circle marker reached.
[<mid> <mcontent> <attempt>] - If message should be (re)handled now.
IMPLEMENTATION DETAIL: Use `worker` instead. Rotates queue's mid-circle and processes next mid. Returns: nil - If msg GC'd, locked, or set to backoff. "eoq-backoff" - If circle uninitialized or end-of-circle marker reached. [<mid> <mcontent> <attempt>] - If message should be (re)handled now.
Pushes given message (any Clojure datatype) to named queue and returns unique message id or {:carmine.mq/error <message-status>}. Options:
Pushes given message (any Clojure datatype) to named queue and returns unique message id or {:carmine.mq/error <message-status>}. Options: * unique-message-id - Specify an explicit message id (e.g. message hash) to perform a de-duplication check. If unspecified, a unique id will be auto-generated. * allow-requeue? - When true, allow buffered escrow-requeue for a message in the :locked or :done-with-backoff state.
(exp-backoff n-attempt)
(exp-backoff n-attempt {:keys [min max factor] :or {factor 1000}})
Returns binary exponential backoff value for n<=36.
Returns binary exponential backoff value for n<=36.
(handle1 conn-opts qname handler [mid mcontent attempt :as poll-reply])
Implementation detail!
Implementation detail!
(make-dequeue-worker pool
spec
&
{:keys [handler-fn handler-ttl-msecs backoff-msecs
throttle-msecs auto-start?]})
DEPRECATED: Use worker
instead.
DEPRECATED: Use `worker` instead.
Returns current message status, e/o: :queued - Awaiting handler. :queued-with-backoff - Awaiting rehandling. :locked - Currently with handler. :locked-with-requeue - Currently with handler, will requeue on success. :done-awaiting-gc - Finished handling, awaiting GC. :done-with-backoff - Finished handling, awaiting dedupe timeout. nil - Already GC'd or invalid message id.
Returns current message status, e/o: :queued - Awaiting handler. :queued-with-backoff - Awaiting rehandling. :locked - Currently with handler. :locked-with-requeue - Currently with handler, will requeue on success. :done-awaiting-gc - Finished handling, awaiting GC. :done-with-backoff - Finished handling, awaiting dedupe timeout. nil - Already GC'd or invalid message id.
(monitor-fn qname max-circle-size warn-backoff-ms)
Returns a worker monitor fn that warns when queue's mid-circle exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.
Returns a worker monitor fn that warns when queue's mid-circle exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.
(worker conn-opts
qname
&
[{:keys [handler monitor lock-ms eoq-backoff-ms nthreads throttle-ms
auto-start]
:as opts
:or {handler (fn [args] (timbre/infof "%s" args) {:status :success})
monitor (monitor-fn qname 1000 (enc/ms :hours 6))
lock-ms (enc/ms :hours 1)
nthreads 1
throttle-ms 200
eoq-backoff-ms exp-backoff
auto-start true}}])
Returns a threaded worker to poll for and handle messages enqueue
'd to
named queue. Options:
:handler - (fn [{:keys [qname mid message attempt]}]) that throws an ex
or returns {:status <#{:success :error :retry}>
:throwable <Throwable>
:backoff-ms <retry-or-dedupe-backoff-ms}.
:monitor - (fn [{:keys [mid-circle-size ndry-runs poll-reply]}])
called on each worker loop iteration. Useful for queue
monitoring/logging. See also monitor-fn
.
:lock-ms - Max time handler may keep a message before handler
considered fatally stalled and message re-queued. Must be
sufficiently high to prevent double handling.
:eoq-backoff-ms - Thread sleep period each time end of queue is reached.
Can be a (fn [ndry-runs]) -> ms (n<=5) will be used.
Sleep synchronized for all queue workers.
:nthreads - Number of synchronized worker threads to use.
:throttle-ms - Thread sleep period between each poll.
Returns a threaded worker to poll for and handle messages `enqueue`'d to named queue. Options: :handler - (fn [{:keys [qname mid message attempt]}]) that throws an ex or returns {:status <#{:success :error :retry}> :throwable <Throwable> :backoff-ms <retry-or-dedupe-backoff-ms}. :monitor - (fn [{:keys [mid-circle-size ndry-runs poll-reply]}]) called on each worker loop iteration. Useful for queue monitoring/logging. See also `monitor-fn`. :lock-ms - Max time handler may keep a message before handler considered fatally stalled and message re-queued. Must be sufficiently high to prevent double handling. :eoq-backoff-ms - Thread sleep period each time end of queue is reached. Can be a (fn [ndry-runs]) -> ms (n<=5) will be used. Sleep synchronized for all queue workers. :nthreads - Number of synchronized worker threads to use. :throttle-ms - Thread sleep period between each poll.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close