Carmine-backed Clojure message queue. All heavy lifting by Redis. Message circle architecture used here is simple, reliable, and has reasonable throughput but at best mediocre latency.
Redis keys:
allow-requeue? option.Ref. http://antirez.com/post/250 for basic implementation details.
Carmine-backed Clojure message queue. All heavy lifting by Redis.
Message circle architecture used here is simple, reliable, and has
reasonable throughput but at best mediocre latency.
Redis keys:
  * carmine:mq:<qname>:messages     - hash, {mid mcontent}.
  * carmine:mq:<qname>:locks        - hash, {mid lock-expiry-time}.
  * carmine:mq:<qname>:backoffs     - hash, {mid backoff-expiry-time}.
  * carmine:mq:<qname>:nattempts    - hash, {mid attempt-count}.
  * carmine:mq:<qname>:mid-circle   - list, rotating list of mids.
  * carmine:mq:<qname>:done         - set, awaiting gc, requeue, etc.
  * carmine:mq:<qname>:requeue      - set, for `allow-requeue?` option.
  * carmine:mq:<qname>:eoq-backoff? - ttl flag, used for queue-wide
                                      (every-worker) polling backoff.
  * carmine:mq:<qname>:ndry-runs    - int, number of times worker(s) have
                                      burnt through queue w/o work to do.
Ref. http://antirez.com/post/250 for basic implementation details.(clear-queues conn-opts & qnames)IMPLEMENTATION DETAIL: Use worker instead.
Rotates queue's mid-circle and processes next mid. Returns:
nil             - If msg GC'd, locked, or set to backoff.
"eoq-backoff" - If circle uninitialized or end-of-circle marker reached.
[<mid> <mcontent> <attempt>] - If message should be (re)handled now.
IMPLEMENTATION DETAIL: Use `worker` instead. Rotates queue's mid-circle and processes next mid. Returns: nil - If msg GC'd, locked, or set to backoff. "eoq-backoff" - If circle uninitialized or end-of-circle marker reached. [<mid> <mcontent> <attempt>] - If message should be (re)handled now.
Pushes given message (any Clojure datatype) to named queue and returns unique message id or {:carmine.mq/error <message-status>}. Options:
Pushes given message (any Clojure datatype) to named queue and returns unique
message id or {:carmine.mq/error <message-status>}. Options:
  * unique-message-id  - Specify an explicit message id (e.g. message hash) to
                         perform a de-duplication check. If unspecified, a
                         unique id will be auto-generated.
  * allow-requeue?     - When true, allow buffered escrow-requeue for a
                         message in the :locked or :done-with-backoff state.(exp-backoff attempt & [{:keys [factor min max] :or {factor 2200}}])Returns binary exponential backoff value.
Returns binary exponential backoff value.
(get-expected-throughput nthreads ?throttle-ms)Returns ~n msgs handled per second, excluding gc + queue maintenance.
Returns ~n msgs handled per second, excluding gc + queue maintenance.
(handle1 conn-opts qname handler [mid mcontent attempt :as poll-reply])Implementation detail!
Implementation detail!
(start this)(stop this)(make-dequeue-worker pool
                     spec
                     &
                     {:keys [handler-fn handler-ttl-msecs backoff-msecs
                             throttle-msecs auto-start?]})DEPRECATED: Use worker instead.
DEPRECATED: Use `worker` instead.
Returns current message status, e/o: :queued - Awaiting handler. :queued-with-backoff - Awaiting rehandling. :locked - Currently with handler. :locked-with-requeue - Currently with handler, will requeue on success. :done-awaiting-gc - Finished handling, awaiting GC. :done-with-backoff - Finished handling, awaiting dedupe timeout. nil - Already GC'd or invalid message id.
Returns current message status, e/o: :queued - Awaiting handler. :queued-with-backoff - Awaiting rehandling. :locked - Currently with handler. :locked-with-requeue - Currently with handler, will requeue on success. :done-awaiting-gc - Finished handling, awaiting GC. :done-with-backoff - Finished handling, awaiting dedupe timeout. nil - Already GC'd or invalid message id.
(monitor-fn qname max-circle-size warn-backoff-ms)Returns a worker monitor fn that warns when queue's mid-circle exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.
Returns a worker monitor fn that warns when queue's mid-circle exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.
(queue-status conn-opts qname)(worker conn-opts
        qname
        &
        [{:keys [handler monitor lock-ms eoq-backoff-ms nthreads throttle-ms
                 auto-start]
          :as opts
          :or {handler (fn [args] (timbre/infof "%s" args) {:status :success})
               monitor (monitor-fn qname 1000 (* 1000 60 60 6))
               lock-ms (encore/ms :hours 1)
               nthreads 1
               throttle-ms 200
               eoq-backoff-ms exp-backoff
               auto-start true}}])Returns a threaded worker to poll for and handle messages enqueue'd to
named queue. Options:
:handler        - (fn [{:keys [mid message attempt]}]) that throws an ex
or returns {:status     <#{:success :error :retry}>
:throwable  <Throwable>
:backoff-ms <retry-or-dedupe-backoff-ms}.
:monitor        - (fn [{:keys [mid-circle-size ndry-runs poll-reply]}])
called on each worker loop iteration. Useful for queue
monitoring/logging. See also monitor-fn.
:lock-ms        - Max time handler may keep a message before handler
considered fatally stalled and message re-queued. Must be
sufficiently high to prevent double handling.
:eoq-backoff-ms - Thread sleep period each time end of queue is reached.
Can be a (fn [ndry-runs]) -> ms (n<=5) will be used.
Sleep synchronized for all queue workers.
:nthreads       - Number of synchronized worker threads to use.
:throttle-ms    - Thread sleep period between each poll.
Returns a threaded worker to poll for and handle messages `enqueue`'d to
named queue. Options:
 :handler        - (fn [{:keys [mid message attempt]}]) that throws an ex
                   or returns {:status     <#{:success :error :retry}>
                               :throwable  <Throwable>
                               :backoff-ms <retry-or-dedupe-backoff-ms}.
 :monitor        - (fn [{:keys [mid-circle-size ndry-runs poll-reply]}])
                   called on each worker loop iteration. Useful for queue
                   monitoring/logging. See also `monitor-fn`.
 :lock-ms        - Max time handler may keep a message before handler
                   considered fatally stalled and message re-queued. Must be
                   sufficiently high to prevent double handling.
 :eoq-backoff-ms - Thread sleep period each time end of queue is reached.
                   Can be a (fn [ndry-runs]) -> ms (n<=5) will be used.
                   Sleep synchronized for all queue workers.
 :nthreads       - Number of synchronized worker threads to use.
 :throttle-ms    - Thread sleep period between each poll.cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs | 
| ← | Move to previous article | 
| → | Move to next article | 
| Ctrl+/ | Jump to the search field |