Liking cljdoc? Tell your friends :D

redismq.core


-with-work-body-exception-handlingcljmacro

(-with-work-body-exception-handling worker context & body)

Handles exceptions for the worker body loop. Why? It is abstracted so that the same exception logic can be used both for, within the context of a specific queue-msg being handled, and also wrapped a few layers up. We want this inner layer wrapped around the handling of the queue-msg specifically because then we can wrap it in log/with-context to get that info into any logged exceptions.

Handles exceptions for the worker body loop.
Why? It is abstracted so that the same exception logic can be used both for,
within the context of a specific queue-msg being handled, and also wrapped a
few layers up. We want this inner layer wrapped around the handling of the
queue-msg specifically because then we can wrap it in log/with-context to get
that info into any logged exceptions.
raw docstring

consumeclj

(consume queue fun)

Takes a message from the queue and passes it to the specified fun. See unwrap-obj for the format of the message.

Takes a message from the queue and passes it to the specified fun. See
unwrap-obj for the format of the message.
raw docstring

current-depthclj

(current-depth queue)

Returns the number of objects in the queue

Returns the number of objects in the queue
raw docstring

current-processingclj

(current-processing queue)
(current-processing queue limit)

Returns the current set of jobs being processed, in their serialized form.

Returns the current set of jobs being processed, in their serialized form.
raw docstring

current-queueclj

(current-queue queue)
(current-queue queue limit)

Returns all the objects in the queue.

Returns all the objects in the queue.
raw docstring

enqueueclj

(enqueue queue obj)

Puts the object on the queue.

Puts the object on the queue.
raw docstring

failed-messagesclj

(failed-messages queue)
(failed-messages queue limit)

Returns all of the failed objets for the queue

Returns all of the failed objets for the queue
raw docstring

flush-queueclj

(flush-queue queue)

Removes all events from the queue, including currently processing jobs, and failures.

Removes all events from the queue, including currently processing jobs, and failures.
raw docstring

identity-serializerclj


make-queueclj

(make-queue name conn-spec)
(make-queue name
            conn-spec
            {:keys [conn-pool max-depth retries serializer failure-max-depth
                    save-failures]
             :or {save-failures true retries 5 serializer identity-serializer}})

Returns a queue object, used in the rest of the MQ api. Must be given a name, which will be the name of the Redis key where events are stored. Events being procesed by a worker will go into the NAME-procesing key while being worked on, and removed when done. The conn-spec is a Carmine connection spec, a map containing a :host and :port key at the minimum.

It also takes the following keyword arguments:

  • conn-pool - if provided, a carmine connection pool to use

  • max-depth - the most events that can be on the queue, drops oldest event when the next event is added.

  • retries - how many times to rretry processing an event. If exceeded, the event is considered failed. Retries will only happen if a reaper is running on the queue.

  • serializer - an object implementing the Serializer protocol.

  • save-failures - if true, the default value, failed events will be added to the NAME-failures key.

  • failures-max-depth - like max depth, but for the NAME-failures key

Returns a queue object, used in the rest of the MQ api.  Must be
given a `name`, which will be the name of the Redis key where events
are stored.  Events being procesed by a worker will go into the
`NAME-procesing` key while being worked on, and removed when done.
The `conn-spec` is a Carmine connection spec, a map containing
a :host and :port key at the minimum.

It also takes the following keyword arguments:

  * conn-pool - if provided, a carmine connection pool to use

  * max-depth - the most events that can be on the queue, drops oldest
  event when the next event is added.

  * retries - how many times to rretry processing an event.  If
  exceeded, the event is considered failed.  Retries will only happen
  if a reaper is running on the queue.

  * serializer - an object implementing the Serializer protocol.

  * save-failures - if true, the default value, failed events will be added to the `NAME-failures` key.

  * failures-max-depth - like max depth, but for the `NAME-failures` key
raw docstring

queue-countsclj

(queue-counts {:keys [name processing-key failure-key] :as queue})

Returns a map containing a summary of the queues state

Returns a map containing a summary of the queues state
raw docstring

queue-reaperclj

(queue-reaper delay & qs)

Creates a reaper, which will chec for events that are stuck in 'processing' and clean them up. It will put them back on the queue, and let them be retried. if the retry limit has been reached, it will fail them.

The reaper will be added to the queue-reapers atom.

While reapers are designed to behave properly if there are multiple reapers working the same queue, it is discouraged.

Creates a reaper, which will chec for events that are stuck in
'processing' and clean them up.  It will put them back on the queue,
and let them be retried.  if the retry limit has been reached, it
will fail them.

The reaper will be added to the `queue-reapers` atom.

While reapers are designed to behave properly if there are multiple
reapers working the same queue, it is discouraged.
raw docstring

queue-reapersclj


queue-workerclj

(queue-worker queue
              handler
              &
              {:keys [delay timeout exit-on-error threads]
               :or {delay 0 timeout 30 exit-on-error true threads 1}})

Start a queue worker, which will be added to the queue-worker atom. This worker will take messages off of queue, and call the handler fn with them, one at a time.

  • delay - how many seconds to wait after processing a message before getting the next.
  • timeout - how long to block waiting for an event
  • exit-on-error - If a fatal error occurs, exit.
  • threads - how many threads to start for this worker.
Start a queue worker, which will be added to the `queue-worker`
atom.  This worker will take messages off of `queue`, and call the
`handler` fn with them, one at a time.

* delay - how many seconds to wait after processing a message before getting the next.
* timeout - how long to block waiting for an event
* exit-on-error - If a fatal error occurs, exit.
* threads - how many threads to start for this worker.
raw docstring

queue-workersclj


reap-queueclj

(reap-queue queue retry-limit old-message-set)

rerun-failuresclj

(rerun-failures {:keys [queue] :as options})

Takes all messages from the failures queue and adds them back to the processing queue, input queue again.

Takes all messages from the failures queue and adds them back to the
processing queue, input queue again.
raw docstring

rerun-oneclj

(rerun-one {:keys [queue target-queue event-transform-fn k]
            :or {event-transform-fn identity target-queue queue}})

Serializecljprotocol

Protocol for converting the given data into a form suitable for serialization to and from JSON.

Protocol for converting the given data into a form suitable for
serialization to and from JSON.

deserializeclj

(deserialize converter ext-value)

serializeclj

(serialize converter int-value)
raw docstring

stop-reaperclj

(stop-reaper qr)

Stop reaper from running. It will also be removed from the queue-reapers atom

Stop reaper from running.  It will also be removed from the queue-reapers atom
raw docstring

stop-workerclj

(stop-worker qw)

Stop the queue worker. Removes it from the queue-workers atom.

Stop the queue worker.  Removes it from the `queue-workers` atom.
raw docstring

stop-workersclj

(stop-workers workers)

A quicker way to stop workers, rather than waiting for each in turn.

A quicker way to stop workers, rather than waiting for each in turn.
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close