Liking cljdoc? Tell your friends :D

taoensso.carmine.message-queue

Carmine-backed Clojure message queue, v2. All heavy lifting by Redis.

Uses an optimized message circle architecture that is simple, reliable, and has pretty good throughput and latency.

See mq-architecture.svg in repo for diagram of architecture, Ref. http://antirez.com/post/250 for initial inspiration.

Message status e/o: nil - Not in queue or already GC'd :queued - Awaiting handler :queued-with-backoff - Awaiting handler, but skip until backoff expired :locked - Currently with handler :locked-with-requeue - Currently with handler, will requeue when done :done-awaiting-gc - Finished handling, awaiting GC :done-with-backoff - Finished handling, awaiting GC, but skip until dedupe backoff expired :done-with-requeue - Will requeue, but skip until dedupe backoff expired

Redis keys (all prefixed with carmine:mq:<qname>:):

  • messages - hash: {mid mcontent} ; Message content

  • messages-rq - hash: {mid mcontent} ; '' for requeues

  • lock-times - hash: {mid lock-ms} ; Optional mid-specific lock duration

  • lock-times-rq - hash: {mid lock-ms} ; '' for requeues

  • udts - hash: {mid udt-first-enqueued}

  • locks - hash: {mid lock-expiry-time} ; Active locks

  • backoffs - hash: {mid backoff-expiry-time} ; Active backoffs

  • nattempts - hash: {mid attempt-count}

  • done - mid set: awaiting gc, etc.

  • requeue - mid set: awaiting requeue ; Deprecated

  • mids-ready - list: mids for immediate handling (push to left, pop from right)

  • mid-circle - list: mids for maintenance processing (push to left, pop from right)

  • ndry-runs - int: num times worker(s) have lapped queue w/o work to do

  • isleep-a - list: 0/1 sentinel element for interruptible-sleep

  • isleep-b - list: 0/1 sentinel element for interruptible-sleep

Carmine-backed Clojure message queue, v2.
All heavy lifting by Redis.

Uses an optimized message circle architecture that is simple, reliable,
and has pretty good throughput and latency.

See `mq-architecture.svg` in repo for diagram of architecture,
Ref. <http://antirez.com/post/250> for initial inspiration.

Message status e/o:
  nil                  - Not in queue or already GC'd
  :queued              - Awaiting handler
  :queued-with-backoff - Awaiting handler, but skip until backoff expired
  :locked              - Currently with handler
  :locked-with-requeue - Currently with handler, will requeue when done
  :done-awaiting-gc    - Finished handling, awaiting GC
  :done-with-backoff   - Finished handling, awaiting GC,
                         but skip until dedupe backoff expired
  :done-with-requeue   - Will requeue, but skip until dedupe backoff expired

Redis keys (all prefixed with `carmine:mq:<qname>:`):
  * messages      - hash: {mid mcontent} ; Message content
  * messages-rq   - hash: {mid mcontent} ; '' for requeues
  * lock-times    - hash: {mid lock-ms}  ; Optional mid-specific lock duration
  * lock-times-rq - hash: {mid lock-ms}  ; '' for requeues
  * udts          - hash: {mid  udt-first-enqueued}
  * locks         - hash: {mid    lock-expiry-time} ; Active locks
  * backoffs      - hash: {mid backoff-expiry-time} ; Active backoffs
  * nattempts     - hash: {mid attempt-count}
  * done          - mid set: awaiting gc, etc.
  * requeue       - mid set: awaiting requeue ; Deprecated

  * mids-ready    - list: mids for immediate handling     (push to left, pop from right)
  * mid-circle    - list: mids for maintenance processing (push to left, pop from right)
  * ndry-runs     - int: num times worker(s) have lapped queue w/o work to do

  * isleep-a      - list: 0/1 sentinel element for `interruptible-sleep`
  * isleep-b      - list: 0/1 sentinel element for `interruptible-sleep`
raw docstring

default-throttle-ms-fnclj

(default-throttle-ms-fn queue-size)

Default/example (fn [queue-size]) -> ?throttle-msecs

Default/example (fn [queue-size]) -> ?throttle-msecs
raw docstring

enqueueclj

(enqueue qname message)
(enqueue qname
         message
         {:keys [mid init-backoff-ms lock-ms can-update? can-requeue?
                 reset-init-backoff?]})

Pushes given message (any Clojure data type) to named queue and returns a map with keys: [success? mid action error].

When success? is true: mid, action will be present, with action e/o #{:added :updated}.

When success? is false: error will be present, with error e/o #{:already-queued :locked :backoff}.

Options: :init-backoff-ms - Optional initial backoff in msecs. :lock-ms - Optional lock time in msecs. When unspecified, the worker's default lock time will be used.

`:mid`             - Optional unique message id (e.g. message hash) to
                     identify a specific message for dedupe/update/requeue.
                     When unspecified, a random uuid will be used.

`:can-update?`     - When true, will update message content and/or lock-ms for
                     an mid still awaiting handling.

`:can-requeue?`    - When true, will mark message with `:locked` or
                     `:done-with-backoff` status so that it will be
                     automatically requeued after garbage collection.
Pushes given message (any Clojure data type) to named queue and returns
a map with keys: [success? mid action error].

When `success?` is true:  `mid`, `action` will be present, with
                          `action` e/o #{:added :updated}.

When `success?` is false: `error` will be present, with
                          `error` e/o #{:already-queued :locked :backoff}.

Options:
    `:init-backoff-ms` - Optional initial backoff in msecs.
    `:lock-ms`         - Optional lock time in msecs. When unspecified, the
                         worker's default lock time will be used.

    `:mid`             - Optional unique message id (e.g. message hash) to
                         identify a specific message for dedupe/update/requeue.
                         When unspecified, a random uuid will be used.

    `:can-update?`     - When true, will update message content and/or lock-ms for
                         an mid still awaiting handling.

    `:can-requeue?`    - When true, will mark message with `:locked` or
                         `:done-with-backoff` status so that it will be
                         automatically requeued after garbage collection.
raw docstring

exp-backoffclj

(exp-backoff n-attempt)
(exp-backoff n-attempt {:keys [min max factor] :or {factor 1000}})

Returns binary exponential backoff value for n<=36.

Returns binary exponential backoff value for n<=36.
raw docstring

IWorkercljprotocol

Private, please don't use this.

Private, please don't use this.

startclj

(start this)

stopclj

(stop this)
raw docstring

message-statusclj

(message-status qname mid)

Returns current message status, e/o: nil - Not in queue or already GC'd :queued - Awaiting handler :queued-with-backoff - Awaiting handler, but skip until backoff expired :locked - Currently with handler :locked-with-requeue - Currently with handler, will requeue when done :done-awaiting-gc - Finished handling, awaiting GC :done-with-backoff - Finished handling, awaiting GC, but skip until dedupe backoff expired :done-with-requeue - Will requeue, but skip until dedupe backoff expired

Returns current message status, e/o:
`nil`                  - Not in queue or already GC'd
`:queued`              - Awaiting handler
`:queued-with-backoff` - Awaiting handler, but skip until backoff expired
`:locked`              - Currently with handler
`:locked-with-requeue` - Currently with handler, will requeue when done
`:done-awaiting-gc`    - Finished handling, awaiting GC
`:done-with-backoff`   - Finished handling, awaiting GC,
                         but skip until dedupe backoff expired
`:done-with-requeue`   - Will requeue, but skip until dedupe backoff expired
raw docstring

monitor-fnclj

(monitor-fn qname max-queue-size warn-backoff-ms)

Returns a worker monitor fn that warns when queue exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning.

Returns a worker monitor fn that warns when queue exceeds the prescribed
size. A backoff timeout can be provided to rate-limit this warning.
raw docstring

queue-contentclj

(queue-content conn-opts qname)

Returns detailed {<mid> {:keys [message status ...]}} map for every message currently in queue.

O(n_mids) and expensive, avoid use in production.

Returns detailed {<mid> {:keys [message status ...]}} map for every
message currently in queue.

O(n_mids) and expensive, avoid use in production.
raw docstring

queue-namesclj

(queue-names conn-opts)
(queue-names conn-opts pattern)

Returns a non-empty set of existing queue names, or nil. Executes a Redis scan command, so O(n) of the number of keys in db.

Returns a non-empty set of existing queue names, or nil.
Executes a Redis scan command, so O(n) of the number of keys in db.
raw docstring

queue-sizeclj

(queue-size conn-opts qname)

Returns in O(1) the approx number of messages awaiting handler for given named queue. Same as (:nqueued (queue-status conn-opts qname)).

Returns in O(1) the approx number of messages awaiting handler for
given named queue. Same as (:nqueued (queue-status conn-opts qname)).
raw docstring

queue-statusclj

(queue-status conn-opts qname)

Returns in O(1) the approx {:keys [nqueued nlocked nbackoff ntotal]} counts for given named queue.

nlocked and nbackoff may include expired entries!

Returns in O(1) the approx {:keys [nqueued nlocked nbackoff ntotal]}
counts for given named queue.

`nlocked` and `nbackoff` may include expired entries!
raw docstring

queues-clear!!clj

(queues-clear!! conn-opts qnames)

Permanently deletes ALL content for the Carmine message queues with given names.

Returns nil, or a non-empty vector of the queue names that were cleared.

Permanently deletes ALL content for the Carmine message queues with
given names.

Returns nil, or a non-empty vector of the queue names that were cleared.
raw docstring

queues-clear-all!!!clj

(queues-clear-all!!! conn-opts)

DANGER! Permanently deletes ALL content for ALL Carmine message queues. Returns nil, or a non-empty vector of the queue names that were cleared.

**DANGER**!
Permanently deletes ALL content for *ALL* Carmine message queues.
Returns nil, or a non-empty vector of the queue names that were cleared.
raw docstring

set-min-log-level!clj

(set-min-log-level! level)

Sets Timbre's minimum log level for internal Carmine message queue namespaces. Possible levels: #{:trace :debug :info :warn :error :fatal :report}. Default level: :info.

Sets Timbre's minimum log level for internal Carmine message queue namespaces.
Possible levels: #{:trace :debug :info :warn :error :fatal :report}.
Default level: `:info`.
raw docstring

workerclj

(worker conn-opts qname)
(worker conn-opts
        qname
        {:keys [handler monitor lock-ms eoq-backoff-ms throttle-ms auto-start
                nthreads-worker nthreads-handler]
         :as worker-opts
         :or {handler (fn [m] (timbre/info m) {:status :success})
              monitor (monitor-fn qname 1000 (enc/ms :hours 6))
              lock-ms (enc/ms :mins 60)
              nthreads-worker 1
              nthreads-handler 1
              throttle-ms default-throttle-ms-fn
              eoq-backoff-ms exp-backoff
              auto-start true}})

Returns a stateful threaded CarmineMessageQueueWorker to handle messages added to named queue with enqueue.

API:

  • (deref <worker>) => Detailed worker status map (see "Debugging" below).
  • (<worker> :start) => Same as calling (start <worker>).
  • (<worker> :stop) => Same as calling (stop <worker>).
  • (<worker> :queue-size) => Same as calling queue-size for given qname.
  • (<worker> :queue-status) => Same as calling queue-status for given qname.
  • (<worker> :queue-content) => Same as calling queue-content for given qname.

Debugging:

To debug a worker, deref it to see detailed status map with keys: :qname - Worker's queue name (string) :opts - Worker's options map :conn-opts - Worker's connection options map :running? - Is the worker currently running? (true/false) :nthreads - {:keys [worker handler]} :stats :queue-size - {:keys [last min max mean mad var p50 p90 ...]} :queueing-time-ms - {:keys [last min max mean mad var p50 p90 ...]} :handling-time-ns - {:keys [last min max mean mad var p50 p90 ...]} :counts :handler/success - Number of handler calls with :success status :handler/error - Number of handler calls with :error status :handler/retry - Number of handler calls with :retry status :handler/backoff - Number of handler calls encountering an mid in backoff ...

See also the :monitor option below, and utils: queue-names, queue-size, queue-content, message-status, etc.

Options:

:handler (fn [{:keys [qname mid message attempt]}]) called for each worker message. Should return a map with possible keys: :status - ∈ {:success :error :retry} :throwable - Optional Throwable when relevant :backoff-ms - Optional time (in milliseconds) to backoff for dedupe or before retrying

If handler throws ANY `Throwable`, will assume `:error` status and NOT retry.
For custom error handling, make sure to use an appropriate try/catch
within your handler fn!

:monitor - (fn [{:keys [queue-size ndry-runs poll-reply]}]) Called on each worker loop iteration. Useful for queue monitoring/logging. See also monitor-fn.

:lock-ms (default 60 minutes) Default time (in milliseconds) that handler may keep a message before handler is considered fatally stalled and message is re-queued. Must be large enough to prevent double handling!

Can be overridden on a per-message basis via `enqueue`.

:throttle-ms (default default-throttle-ms-fn) Thread sleep period (in milliseconds) between each poll. Can be a (fn [queue-size]) -> ?sleep-msecs.

:eoq-backoff-ms (default exp-backoff) Max time (in milliseconds) to sleep thread each time end of queue is reached. Can be a (fn [ndry-runs]) -> msecs for n<=5. Sleep may be interrupted when new messages are enqueued. If present, connection read timeout should be >= max msecs!

:nthreads-worker - Number of threads to monitor and maintain queue. :nthreads-handler - Number of threads to handle queue messages with handler fn.

Returns a stateful threaded `CarmineMessageQueueWorker` to handle messages
added to named queue with `enqueue`.

API:
  - (deref <worker>)          => Detailed worker status map (see "Debugging" below).
  - (<worker> :start)         => Same as calling (start <worker>).
  - (<worker> :stop)          => Same as calling (stop  <worker>).
  - (<worker> :queue-size)    => Same as calling `queue-size`    for given qname.
  - (<worker> :queue-status)  => Same as calling `queue-status`  for given qname.
  - (<worker> :queue-content) => Same as calling `queue-content` for given qname.

Debugging:

  To debug a worker, deref it to see detailed status map with keys:
    `:qname`     - Worker's queue name (string)
    `:opts`      - Worker's            options map
    `:conn-opts` - Worker's connection options map
    `:running?`  - Is the worker currently running? (true/false)
    `:nthreads`  - {:keys [worker handler]}
    `:stats`
      `:queue-size`        - {:keys [last min max mean mad var p50 p90 ...]}
      `:queueing-time-ms`  - {:keys [last min max mean mad var p50 p90 ...]}
      `:handling-time-ns`  - {:keys [last min max mean mad var p50 p90 ...]}
      `:counts`
        `:handler/success` - Number of handler calls with `:success` status
        `:handler/error`   - Number of handler calls with `:error`   status
        `:handler/retry`   - Number of handler calls with `:retry`   status
        `:handler/backoff` - Number of handler calls encountering an mid in backoff
        ...

  See also the `:monitor` option below, and utils:
    `queue-names`, `queue-size`, `queue-content`, `message-status`, etc.

Options:

  `:handler`
    (fn [{:keys [qname mid message attempt]}]) called for each worker message.
    Should return a map with possible keys:
      `:status`     - ∈ {:success :error :retry}
      `:throwable`  - Optional Throwable when relevant
      `:backoff-ms` - Optional time (in milliseconds) to backoff
                      for dedupe or before retrying

    If handler throws ANY `Throwable`, will assume `:error` status and NOT retry.
    For custom error handling, make sure to use an appropriate try/catch
    within your handler fn!

  `:monitor` - (fn [{:keys [queue-size ndry-runs poll-reply]}])
    Called on each worker loop iteration. Useful for queue monitoring/logging.
    See also `monitor-fn`.

  `:lock-ms` (default 60 minutes)
    Default time (in milliseconds) that handler may keep a message before handler
    is considered fatally stalled and message is re-queued. Must be large enough
    to prevent double handling!

    Can be overridden on a per-message basis via `enqueue`.

  `:throttle-ms` (default `default-throttle-ms-fn`)
    Thread sleep period (in milliseconds) between each poll.
    Can be a (fn [queue-size]) -> ?sleep-msecs.

  `:eoq-backoff-ms` (default `exp-backoff`)
    Max time (in milliseconds) to sleep thread each time end of queue is reached.
    Can be a (fn [ndry-runs]) -> msecs for n<=5.
    Sleep may be interrupted when new messages are enqueued.
    If present, connection read timeout should be >= max msecs!

  `:nthreads-worker`  - Number of threads to monitor and maintain queue.
  `:nthreads-handler` - Number of threads to handle queue messages with handler fn.
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close