Core protocols, constructors, and worker loop.
Core protocols, constructors, and worker loop.
(make-qbackend {:keys [queue dequeue requeue ack nack abandon
on-unexpected-error]
:or {dequeue (fn [_ _])
requeue (fn [_ _ _ _])
ack (fn [_ _ _])
nack (fn [_ _ _])
abandon (fn [_ _ _])
on-unexpected-error (fn [_ _ _])}})Constructs a QBackend from a map of functions.
Required key:
:queue — the queue object passed as the first argument all functionsOptional keys (all default to no-ops):
:dequeue — (fn [queue ctx]):requeue — (fn [queue task opts]):ack — (fn [queue task]):nack — (fn [queue task]):abandon — (fn [queue task]):on-unexpected-error — (fn [queue ctx throwable])Constructs a `QBackend` from a map of functions. Required key: - `:queue` — the queue object passed as the first argument all functions Optional keys (all default to no-ops): - `:dequeue` — `(fn [queue ctx])` - `:requeue` — `(fn [queue task opts])` - `:ack` — `(fn [queue task])` - `:nack` — `(fn [queue task])` - `:abandon` — `(fn [queue task])` - `:on-unexpected-error` — `(fn [queue ctx throwable])`
(make-qtask {:keys [task get-id ready? execute on-complete
get-unreadiness-policy on-unreadiness get-failure-policy
on-failure]
:or {ready? (fn [_ _] true)
on-complete (fn [_ _ _])
on-unreadiness (fn [_ _ _])
on-failure (fn [_ _ _ _])
get-failure-policy (fn [_ _ _] {:action :nack})
get-unreadiness-policy (fn [_ _] {:action :requeue})}})Constructs a QTask from a map of functions and the raw task data.
Required keys:
:task — the raw task value:get-id — fn or keyword to extract the task id from raw task data:execute — (fn [this ctx]) performs the workOptional keys (all have sane defaults):
:ready? — (fn [this ctx]), defaults to (constantly true):on-complete — (fn [this ctx result]), defaults to no-op:get-unreadiness-policy — (fn [this ctx]), defaults to {:action :requeue}:on-unreadiness — (fn [this ctx policy]), defaults to no-op:get-failure-policy — (fn [this ctx throwable]), defaults to {:action :nack}:on-failure — (fn [this ctx policy throwable]), defaults to no-opConstructs a `QTask` from a map of functions and the raw task data.
Required keys:
- `:task` — the raw task value
- `:get-id` — fn or keyword to extract the task id from raw task data
- `:execute` — `(fn [this ctx])` performs the work
Optional keys (all have sane defaults):
- `:ready?` — `(fn [this ctx])`, defaults to `(constantly true)`
- `:on-complete` — `(fn [this ctx result])`, defaults to no-op
- `:get-unreadiness-policy` — `(fn [this ctx])`, defaults to `{:action :requeue}`
- `:on-unreadiness` — `(fn [this ctx policy])`, defaults to no-op
- `:get-failure-policy` — `(fn [this ctx throwable])`, defaults to `{:action :nack}`
- `:on-failure` — `(fn [this ctx policy throwable])`, defaults to no-opRepresents a queue system. All methods except on-unexpected-error!
are automatically wrapped, enriched with phase and task context and
routed to on-unexpected-error!. All methods except on-unexpected-error!
receive a context map as the last argument
Represents a queue system. All methods except `on-unexpected-error!` are automatically wrapped, enriched with phase and task context and routed to `on-unexpected-error!`. All methods except `on-unexpected-error!` receive a context map as the last argument
(abandon! this task ctx)Called when a task is not ready and its unreadiness policy action is not
:requeue.
Called when a task is not ready and its unreadiness policy action is not `:requeue`.
(ack! this task ctx)Mark a task as successfully completed.
Mark a task as successfully completed.
(dequeue! this ctx)Take the next task from the queue. Returns the raw task value, or nil if the queue is empty.
Take the next task from the queue. Returns the raw task value, or nil if the queue is empty.
(nack! this task ctx)Mark a task as failed.
Mark a task as failed.
(on-unexpected-error! this ctx throwable)Called when any phase other than execute! throws.
The throwable's ex-data contains:
:phase: keyword identifying the phase (:ready?, :ack, :requeue etc.):task: the raw task data, when the error occurred after dequeueCalled when any phase other than `execute!` throws. The throwable's `ex-data` contains: - `:phase`: keyword identifying the phase (`:ready?`, `:ack`, `:requeue` etc.) - `:task`: the raw task data, when the error occurred after dequeue
(requeue! this task opts ctx)Return a task to the queue. opts is the :requeue-opts map from the
task's failure or unreadiness policy, and may contain hints such as delay.
Return a task to the queue. `opts` is the `:requeue-opts` map from the task's failure or unreadiness policy, and may contain hints such as delay.
Specification of a task.
Specification of a task.
(get-failure-policy this ctx throwable)Called when execute! throws. Return a policy map with an :action key:
{:action :requeue, :requeue-opts {...}} — put the task back in the queue{:action <anything-else>} — call on-failure! then nack!Called when `execute!` throws. Return a policy map with an `:action` key:
- `{:action :requeue, :requeue-opts {...}}` — put the task back in the queue
- `{:action <anything-else>}` — call `on-failure!` then `nack!`(execute! this ctx)Perform the task's work. If it throws it is regarded as failure and next actions
are dictated by a failure policy. Otherwise, a result value is returned
and passed to on-complete!.
Perform the task's work. If it throws it is regarded as failure and next actions are dictated by a failure policy. Otherwise, a result value is returned and passed to `on-complete!`.
(ready? this ctx)Return true if the task's preconditions are satisfied and it should be executed.
Return true if the task's preconditions are satisfied and it should be executed.
(task-id this)Return the task's unique identifier.
Return the task's unique identifier.
(on-complete! this ctx result)Called after a successful execute! and after ack! is called on the
backend.
Called after a successful `execute!` and after `ack!` is called on the backend.
(get-raw this)Return the raw task data as it was dequeued.
Return the raw task data as it was dequeued.
(on-unreadiness! this ctx policy)Applies any other unreadiness policy than :requeue after abandon!
is called on the backend.
Applies any other unreadiness policy than :requeue after `abandon!` is called on the backend.
(get-unreadiness-policy this ctx)Called when ready? returns false. Return a policy map with an :action
key:
{:action :requeue, :requeue-opts {...}} — put the task back in the queue{:action <anything-else>} — call on-unreadiness! then abandon!Called when `ready?` returns false. Return a policy map with an `:action`
key:
- `{:action :requeue, :requeue-opts {...}}` — put the task back in the queue
- `{:action <anything-else>}` — call `on-unreadiness!` then `abandon!`(on-failure! this ctx policy throwable)Applies any other failure policy than :requeue, after nack! is called on the queue backend
Applies any other failure policy than :requeue, after nack! is called on the queue backend
(worker-loop {:keys [queue-backend ctx task-config]})Executes one iteration of the worker loop against the given queue backend.
Accepts a map with:
:queue-backend — a QBackend instance:ctx — arbitrary context map injected into all task functions:task-config — task configuration map passed to make-qtaskExecutes one iteration of the worker loop against the given queue backend. Accepts a map with: - `:queue-backend` — a `QBackend` instance - `:ctx` — arbitrary context map injected into all task functions - `:task-config` — task configuration map passed to `make-qtask`
(wrapped-call phase body)(wrapped-call phase task body)Optionally enrich any thrown exception with a given phase
and possibly a task. The error is delivered to on-unexpected-error!.
Optionally enrich any thrown exception with a given phase and possibly a task. The error is delivered to `on-unexpected-error!`.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |