(-with-work-body-exception-handling worker context & body)
Handles exceptions for the worker body loop. Why? It is abstracted so that the same exception logic can be used both for, within the context of a specific queue-msg being handled, and also wrapped a few layers up. We want this inner layer wrapped around the handling of the queue-msg specifically because then we can wrap it in log/with-context to get that info into any logged exceptions.
Handles exceptions for the worker body loop. Why? It is abstracted so that the same exception logic can be used both for, within the context of a specific queue-msg being handled, and also wrapped a few layers up. We want this inner layer wrapped around the handling of the queue-msg specifically because then we can wrap it in log/with-context to get that info into any logged exceptions.
(consume queue fun)
Takes a message from the queue and passes it to the specified fun. See unwrap-obj for the format of the message.
Takes a message from the queue and passes it to the specified fun. See unwrap-obj for the format of the message.
(current-depth queue)
Returns the number of objects in the queue
Returns the number of objects in the queue
(current-processing queue)
(current-processing queue limit)
Returns the current set of jobs being processed, in their serialized form.
Returns the current set of jobs being processed, in their serialized form.
(current-queue queue)
(current-queue queue limit)
Returns all the objects in the queue.
Returns all the objects in the queue.
(enqueue queue obj)
Puts the object on the queue.
Puts the object on the queue.
(failed-messages queue)
(failed-messages queue limit)
Returns all of the failed objets for the queue
Returns all of the failed objets for the queue
(flush-queue queue)
Removes all events from the queue, including currently processing jobs, and failures.
Removes all events from the queue, including currently processing jobs, and failures.
(make-queue name conn-spec)
(make-queue name
conn-spec
{:keys [conn-pool max-depth retries serializer failure-max-depth
save-failures]
:or {save-failures true retries 5 serializer identity-serializer}})
Returns a queue object, used in the rest of the MQ api. Must be
given a name
, which will be the name of the Redis key where events
are stored. Events being procesed by a worker will go into the
NAME-procesing
key while being worked on, and removed when done.
The conn-spec
is a Carmine connection spec, a map containing
a :host and :port key at the minimum.
It also takes the following keyword arguments:
conn-pool - if provided, a carmine connection pool to use
max-depth - the most events that can be on the queue, drops oldest event when the next event is added.
retries - how many times to rretry processing an event. If exceeded, the event is considered failed. Retries will only happen if a reaper is running on the queue.
serializer - an object implementing the Serializer protocol.
save-failures - if true, the default value, failed events will be added to the NAME-failures
key.
failures-max-depth - like max depth, but for the NAME-failures
key
Returns a queue object, used in the rest of the MQ api. Must be given a `name`, which will be the name of the Redis key where events are stored. Events being procesed by a worker will go into the `NAME-procesing` key while being worked on, and removed when done. The `conn-spec` is a Carmine connection spec, a map containing a :host and :port key at the minimum. It also takes the following keyword arguments: * conn-pool - if provided, a carmine connection pool to use * max-depth - the most events that can be on the queue, drops oldest event when the next event is added. * retries - how many times to rretry processing an event. If exceeded, the event is considered failed. Retries will only happen if a reaper is running on the queue. * serializer - an object implementing the Serializer protocol. * save-failures - if true, the default value, failed events will be added to the `NAME-failures` key. * failures-max-depth - like max depth, but for the `NAME-failures` key
(queue-counts {:keys [name processing-key failure-key] :as queue})
Returns a map containing a summary of the queues state
Returns a map containing a summary of the queues state
(queue-reaper delay & qs)
Creates a reaper, which will chec for events that are stuck in 'processing' and clean them up. It will put them back on the queue, and let them be retried. if the retry limit has been reached, it will fail them.
The reaper will be added to the queue-reapers
atom.
While reapers are designed to behave properly if there are multiple reapers working the same queue, it is discouraged.
Creates a reaper, which will chec for events that are stuck in 'processing' and clean them up. It will put them back on the queue, and let them be retried. if the retry limit has been reached, it will fail them. The reaper will be added to the `queue-reapers` atom. While reapers are designed to behave properly if there are multiple reapers working the same queue, it is discouraged.
(queue-worker queue
handler
&
{:keys [delay timeout exit-on-error threads]
:or {delay 0 timeout 30 exit-on-error true threads 1}})
Start a queue worker, which will be added to the queue-worker
atom. This worker will take messages off of queue
, and call the
handler
fn with them, one at a time.
Start a queue worker, which will be added to the `queue-worker` atom. This worker will take messages off of `queue`, and call the `handler` fn with them, one at a time. * delay - how many seconds to wait after processing a message before getting the next. * timeout - how long to block waiting for an event * exit-on-error - If a fatal error occurs, exit. * threads - how many threads to start for this worker.
(reap-queue queue retry-limit old-message-set)
(rerun-failures {:keys [queue] :as options})
Takes all messages from the failures queue and adds them back to the processing queue, input queue again.
Takes all messages from the failures queue and adds them back to the processing queue, input queue again.
(rerun-one {:keys [queue target-queue event-transform-fn k]
:or {event-transform-fn identity target-queue queue}})
Protocol for converting the given data into a form suitable for serialization to and from JSON.
Protocol for converting the given data into a form suitable for serialization to and from JSON.
(deserialize converter ext-value)
(serialize converter int-value)
(stop-reaper qr)
Stop reaper from running. It will also be removed from the queue-reapers atom
Stop reaper from running. It will also be removed from the queue-reapers atom
(stop-worker qw)
Stop the queue worker. Removes it from the queue-workers
atom.
Stop the queue worker. Removes it from the `queue-workers` atom.
(stop-workers workers)
A quicker way to stop workers, rather than waiting for each in turn.
A quicker way to stop workers, rather than waiting for each in turn.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close