Core protocols, constructors, and worker loop.
Core protocols, constructors, and worker loop.
(make-qbackend {:keys [queue dequeue requeue ack nack abandon
on-unexpected-error]
:or {dequeue (fn [_])
requeue (fn [_ _ _])
ack (fn [_ _])
nack (fn [_ _])
abandon (fn [_ _])
on-unexpected-error (fn [_ _ _])}})Constructs a QBackend from a map of functions.
Required key:
:queue — the queue object passed as the first argument to :dequeue and :requeueOptional keys (all default to no-ops):
:dequeue — (fn [queue]) returns raw task data or nil:requeue — (fn [queue task opts]):ack — (fn [this task]):nack — (fn [this task]):abandon — (fn [this task]):on-unexpected-error — (fn [this ctx throwable])Constructs a `QBackend` from a map of functions. Required key: - `:queue` — the queue object passed as the first argument to `:dequeue` and `:requeue` Optional keys (all default to no-ops): - `:dequeue` — `(fn [queue])` returns raw task data or nil - `:requeue` — `(fn [queue task opts])` - `:ack` — `(fn [this task])` - `:nack` — `(fn [this task])` - `:abandon` — `(fn [this task])` - `:on-unexpected-error` — `(fn [this ctx throwable])`
(make-qtask {:keys [task get-id ready? execute on-complete
get-unreadiness-policy on-unreadiness get-failure-policy
on-failure]
:or {ready? (fn [_ _] true)
on-complete (fn [_ _ _])
on-unreadiness (fn [_ _ _])
on-failure (fn [_ _ _ _])
get-failure-policy (fn [_ _ _] {:action :nack})
get-unreadiness-policy (fn [_ _] {:action :requeue})}})Constructs a QTask from a map of functions and the raw task data.
Required keys:
:task — the raw task value:get-id — fn or keyword to extract the task id from raw task data:execute — (fn [this ctx]) performs the workOptional keys (all have sane defaults):
:ready? — (fn [this ctx]), defaults to (constantly true):on-complete — (fn [this ctx result]), defaults to no-op:get-unreadiness-policy — (fn [this ctx]), defaults to {:action :requeue}:on-unreadiness — (fn [this ctx policy]), defaults to no-op:get-failure-policy — (fn [this ctx throwable]), defaults to {:action :nack}:on-failure — (fn [this ctx policy throwable]), defaults to no-opConstructs a `QTask` from a map of functions and the raw task data.
Required keys:
- `:task` — the raw task value
- `:get-id` — fn or keyword to extract the task id from raw task data
- `:execute` — `(fn [this ctx])` performs the work
Optional keys (all have sane defaults):
- `:ready?` — `(fn [this ctx])`, defaults to `(constantly true)`
- `:on-complete` — `(fn [this ctx result])`, defaults to no-op
- `:get-unreadiness-policy` — `(fn [this ctx])`, defaults to `{:action :requeue}`
- `:on-unreadiness` — `(fn [this ctx policy])`, defaults to no-op
- `:get-failure-policy` — `(fn [this ctx throwable])`, defaults to `{:action :nack}`
- `:on-failure` — `(fn [this ctx policy throwable])`, defaults to no-opRepresents a queue system. All methods except on-unexpected-error!
are automatically wrapped, enriched with phase and task context and
routed to on-unexpected-error!.
Represents a queue system. All methods except `on-unexpected-error!` are automatically wrapped, enriched with phase and task context and routed to `on-unexpected-error!`.
(abandon! this task)Called when a task is not ready and its unreadiness policy action is not
:requeue.
Called when a task is not ready and its unreadiness policy action is not `:requeue`.
(ack! this task)Mark a task as successfully completed.
Mark a task as successfully completed.
(dequeue! this)Take the next task from the queue. Returns the raw task value, or nil if the queue is empty.
Take the next task from the queue. Returns the raw task value, or nil if the queue is empty.
(nack! this task)Mark a task as failed.
Mark a task as failed.
(on-unexpected-error! this ctx throwable)Called when any phase other than execute! throws.
The throwable's ex-data contains:
:phase: keyword identifying the phase (:ready?, :ack, :requeue etc.):task: the raw task data, when the error occurred after dequeueCalled when any phase other than `execute!` throws. The throwable's `ex-data` contains: - `:phase`: keyword identifying the phase (`:ready?`, `:ack`, `:requeue` etc.) - `:task`: the raw task data, when the error occurred after dequeue
(requeue! this task opts)Return a task to the queue. opts is the :requeue-opts map from the
task's failure or unreadiness policy, and may contain hints such as delay.
Return a task to the queue. `opts` is the `:requeue-opts` map from the task's failure or unreadiness policy, and may contain hints such as delay.
Specification of a task.
Specification of a task.
(get-failure-policy this ctx throwable)Called when execute! throws. Return a policy map with an :action key:
{:action :requeue, :requeue-opts {...}} — put the task back in the queue{:action <anything-else>} — call on-failure! then nack!Called when `execute!` throws. Return a policy map with an `:action` key:
- `{:action :requeue, :requeue-opts {...}}` — put the task back in the queue
- `{:action <anything-else>}` — call `on-failure!` then `nack!`(execute! this ctx)Perform the task's work. If it throws it is regarded as failure and next actions
are dictated by a failure policy. Otherwise, a result value is returned
and passed to on-complete!.
Perform the task's work. If it throws it is regarded as failure and next actions are dictated by a failure policy. Otherwise, a result value is returned and passed to `on-complete!`.
(ready? this ctx)Return true if the task's preconditions are satisfied and it should be executed.
Return true if the task's preconditions are satisfied and it should be executed.
(task-id this)Return the task's unique identifier.
Return the task's unique identifier.
(on-complete! this ctx result)Called after a successful execute!, before ack! is called on the
backend.
Called after a successful `execute!`, before `ack!` is called on the backend.
(get-raw this)Return the raw task data as it was dequeued.
Return the raw task data as it was dequeued.
(on-unreadiness! this ctx policy)Applies any other unreadiness policy than :requeue before abandon!
is called on the backend.
Applies any other unreadiness policy than :requeue before `abandon!` is called on the backend.
(get-unreadiness-policy this ctx)Called when ready? returns false. Return a policy map with an :action
key:
{:action :requeue, :requeue-opts {...}} — put the task back in the queue{:action <anything-else>} — call on-unreadiness! then abandon!Called when `ready?` returns false. Return a policy map with an `:action`
key:
- `{:action :requeue, :requeue-opts {...}}` — put the task back in the queue
- `{:action <anything-else>}` — call `on-unreadiness!` then `abandon!`(on-failure! this ctx policy throwable)Applies any other failure policy than :requeue, before nack! is called on the queue backend
Applies any other failure policy than :requeue, before nack! is called on the queue backend
(worker-loop {:keys [queue-backend ctx task-config]})Executes one iteration of the worker loop against the given queue backend.
Accepts a map with:
:queue-backend — a QBackend instance:ctx — arbitrary context map injected into all task functions:task-config — task configuration map passed to make-qtaskExecutes one iteration of the worker loop against the given queue backend. Accepts a map with: - `:queue-backend` — a `QBackend` instance - `:ctx` — arbitrary context map injected into all task functions - `:task-config` — task configuration map passed to `make-qtask`
(wrapped-call phase body)(wrapped-call phase task body)Optionally enrich any thrown exception with a given phase
and possibly a task. The error is delivered to on-unexpected-error!.
Optionally enrich any thrown exception with a given phase and possibly a task. The error is delivered to `on-unexpected-error!`.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |