Liking cljdoc? Tell your friends :D

taoensso.carmine.message-queue

Carmine-backed Clojure message queue. All heavy lifting by Redis. Message circle architecture used here is simple, reliable, and has reasonable throughput but at best mediocre latency.

Redis keys:

  • carmine:mq:<qname>:messages - hash, {mid mcontent}.
  • carmine:mq:<qname>:locks - hash, {mid lock-expiry-time}.
  • carmine:mq:<qname>:backoffs - hash, {mid backoff-expiry-time}.
  • carmine:mq:<qname>:nattempts - hash, {mid attempt-count}.
  • carmine:mq:<qname>:mid-circle - list, rotating list of mids.
  • carmine:mq:<qname>:done - set, awaiting gc, requeue, etc.
  • carmine:mq:<qname>:requeue - set, for allow-requeue? option.
  • carmine:mq:<qname>:eoq-backoff? - ttl flag, used for queue-wide (every-worker) polling backoff.
  • carmine:mq:<qname>:ndry-runs - int, number of times worker(s) have burnt through queue w/o work to do.

Ref. http://antirez.com/post/250 for basic implementation details

Carmine-backed Clojure message queue. All heavy lifting by Redis.
Message circle architecture used here is simple, reliable, and has
reasonable throughput but at best mediocre latency.

Redis keys:
  * carmine:mq:<qname>:messages     - hash, {mid mcontent}.
  * carmine:mq:<qname>:locks        - hash, {mid lock-expiry-time}.
  * carmine:mq:<qname>:backoffs     - hash, {mid backoff-expiry-time}.
  * carmine:mq:<qname>:nattempts    - hash, {mid attempt-count}.
  * carmine:mq:<qname>:mid-circle   - list, rotating list of mids.
  * carmine:mq:<qname>:done         - set, awaiting gc, requeue, etc.
  * carmine:mq:<qname>:requeue      - set, for `allow-requeue?` option.
  * carmine:mq:<qname>:eoq-backoff? - ttl flag, used for queue-wide
                                      (every-worker) polling backoff.
  * carmine:mq:<qname>:ndry-runs    - int, number of times worker(s) have
                                      burnt through queue w/o work to do.

Ref. http://antirez.com/post/250 for basic implementation details
raw docstring

clear-queuesclj

(clear-queues conn-opts & qnames)
source

dequeueclj

IMPLEMENTATION DETAIL: Use worker instead. Rotates queue's mid-circle and processes next mid. Returns: nil - If msg GC'd, locked, or set to backoff. "eoq-backoff" - If circle uninitialized or end-of-circle marker reached. [<mid> <mcontent> <attempt>] - If message should be (re)handled now.

IMPLEMENTATION DETAIL: Use `worker` instead.
Rotates queue's mid-circle and processes next mid. Returns:
  nil             - If msg GC'd, locked, or set to backoff.
  "eoq-backoff" - If circle uninitialized or end-of-circle marker reached.
  [<mid> <mcontent> <attempt>] - If message should be (re)handled now.
sourceraw docstring

enqueueclj

Pushes given message (any Clojure datatype) to named queue and returns unique message id or {:carmine.mq/error <message-status>}. Options:

  • unique-message-id - Specify an explicit message id (e.g. message hash) to perform a de-duplication check. If unspecified, a unique id will be auto-generated.
  • allow-requeue? - When true, allow buffered escrow-requeue for a message in the :locked or :done-with-backoff state.
Pushes given message (any Clojure datatype) to named queue and returns unique
message id or {:carmine.mq/error <message-status>}. Options:
  * unique-message-id  - Specify an explicit message id (e.g. message hash) to
                         perform a de-duplication check. If unspecified, a
                         unique id will be auto-generated.
  * allow-requeue?     - When true, allow buffered escrow-requeue for a
                         message in the :locked or :done-with-backoff state.
sourceraw docstring

exp-backoffclj

(exp-backoff n-attempt)
(exp-backoff n-attempt {:keys [min max factor] :or {factor 1000}})

Returns binary exponential backoff value for n<=36.

Returns binary exponential backoff value for n<=36.
sourceraw docstring

handle1clj

(handle1 conn-opts qname handler [mid mcontent attempt :as poll-reply])

Implementation detail!

Implementation detail!
sourceraw docstring

IWorkercljprotocol

startclj

(start this)

stopclj

(stop this)
source

make-dequeue-workerclj

(make-dequeue-worker pool
                     spec
                     &
                     {:keys [handler-fn handler-ttl-msecs backoff-msecs
                             throttle-msecs auto-start?]})

DEPRECATED: Use worker instead.

DEPRECATED: Use `worker` instead.
sourceraw docstring

message-statusclj

Returns current message status, e/o: :queued - Awaiting handler. :queued-with-backoff - Awaiting rehandling. :locked - Currently with handler. :locked-with-requeue - Currently with handler, will requeue on success. :done-awaiting-gc - Finished handling, awaiting GC. :done-with-backoff - Finished handling, awaiting dedupe timeout. nil - Already GC'd or invalid message id.

Returns current message status, e/o:
  :queued               - Awaiting handler.
  :queued-with-backoff  - Awaiting rehandling.
  :locked               - Currently with handler.
  :locked-with-requeue  - Currently with handler, will requeue on success.
  :done-awaiting-gc     - Finished handling, awaiting GC.
  :done-with-backoff    - Finished handling, awaiting dedupe timeout.
nil                   - Already GC'd or invalid message id.
sourceraw docstring

monitor-fnclj

(monitor-fn qname max-circle-size warn-backoff-ms)

Returns a worker monitor fn that warns when queue's mid-circle exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.

Returns a worker monitor fn that warns when queue's mid-circle exceeds
the prescribed size. A backoff timeout can be provided to rate-limit this
warning.
sourceraw docstring

queue-statusclj

(queue-status conn-opts qname)
source

workerclj

(worker conn-opts
        qname
        &
        [{:keys [handler monitor lock-ms eoq-backoff-ms nthreads throttle-ms
                 auto-start]
          :as opts
          :or {handler (fn [args] (timbre/infof "%s" args) {:status :success})
               monitor (monitor-fn qname 1000 (enc/ms :hours 6))
               lock-ms (enc/ms :hours 1)
               nthreads 1
               throttle-ms 200
               eoq-backoff-ms exp-backoff
               auto-start true}}])

Returns a threaded worker to poll for and handle messages enqueue'd to named queue. Options: :handler - (fn [{:keys [qname mid message attempt]}]) that throws an ex or returns {:status <#{:success :error :retry}> :throwable <Throwable> :backoff-ms <retry-or-dedupe-backoff-ms}. :monitor - (fn [{:keys [mid-circle-size ndry-runs poll-reply]}]) called on each worker loop iteration. Useful for queue monitoring/logging. See also monitor-fn. :lock-ms - Max time handler may keep a message before handler considered fatally stalled and message re-queued. Must be sufficiently high to prevent double handling. :eoq-backoff-ms - Thread sleep period each time end of queue is reached. Can be a (fn [ndry-runs]) -> ms (n<=5) will be used. Sleep synchronized for all queue workers. :nthreads - Number of synchronized worker threads to use. :throttle-ms - Thread sleep period between each poll.

Returns a threaded worker to poll for and handle messages `enqueue`'d to
named queue. Options:
 :handler        - (fn [{:keys [qname mid message attempt]}]) that throws an ex
                   or returns {:status     <#{:success :error :retry}>
                               :throwable  <Throwable>
                               :backoff-ms <retry-or-dedupe-backoff-ms}.
 :monitor        - (fn [{:keys [mid-circle-size ndry-runs poll-reply]}])
                   called on each worker loop iteration. Useful for queue
                   monitoring/logging. See also `monitor-fn`.
 :lock-ms        - Max time handler may keep a message before handler
                   considered fatally stalled and message re-queued. Must be
                   sufficiently high to prevent double handling.
 :eoq-backoff-ms - Thread sleep period each time end of queue is reached.
                   Can be a (fn [ndry-runs]) -> ms (n<=5) will be used.
                   Sleep synchronized for all queue workers.
 :nthreads       - Number of synchronized worker threads to use.
 :throttle-ms    - Thread sleep period between each poll.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close