Concurrent worker pool that claims jobs from the queue and executes task handlers.
All workers share a single local queue backed by job-manager. The pool uses one worker-id for all job claims; see worker-id to retrieve it.
Task functions are plain Clojure functions keyed by task-identifier string:
(def tasks {"send-email" (fn [{:keys [job]}] (send! (:payload job))) "resize-image" (fn [{:keys [job job-system worker-id]}] ...)})
A task fn receives a context map with: :job - the full job map returned by job-manager/get-jobs :job-system - the {:pool :validator} system map :worker-id - the pool's worker-id string
Task outcome is determined by the return value or exception: • return any value (including nil) - job completes successfully • return (partial-success m) - job reported as partial success • throw any Throwable - job is failed (retry or exhaust)
Typical usage:
(def pool (-> (worker-pool/create-pool sys tasks emitter {:concurrency 4}) worker-pool/start!))
;; On application shutdown (worker-pool/stop! pool 15000)
Standard events emitted by the pool (via the supplied emitter): :pool/start - pool started; :concurrency in data :pool/stop - pool stopped; :forced? in data :job/completed - task fn returned normally :job/failed - task fn threw; retry scheduled :job/exhausted - task fn threw on the final allowed attempt :job/partial-success - task fn returned a partial-success value :worker/error - infrastructure exception outside task execution
Concurrent worker pool that claims jobs from the queue and executes task handlers.
All workers share a single local queue backed by job-manager. The pool uses one
worker-id for all job claims; see worker-id to retrieve it.
Task functions are plain Clojure functions keyed by task-identifier string:
(def tasks
{"send-email" (fn [{:keys [job]}] (send! (:payload job)))
"resize-image" (fn [{:keys [job job-system worker-id]}] ...)})
A task fn receives a context map with:
:job - the full job map returned by job-manager/get-jobs
:job-system - the {:pool :validator} system map
:worker-id - the pool's worker-id string
Task outcome is determined by the return value or exception:
• return any value (including nil) - job completes successfully
• return (partial-success m) - job reported as partial success
• throw any Throwable - job is failed (retry or exhaust)
Typical usage:
(def pool
(-> (worker-pool/create-pool sys tasks emitter {:concurrency 4})
worker-pool/start!))
;; On application shutdown
(worker-pool/stop! pool 15000)
Standard events emitted by the pool (via the supplied emitter):
:pool/start - pool started; :concurrency in data
:pool/stop - pool stopped; :forced? in data
:job/completed - task fn returned normally
:job/failed - task fn threw; retry scheduled
:job/exhausted - task fn threw on the final allowed attempt
:job/partial-success - task fn returned a partial-success value
:worker/error - infrastructure exception outside task execution(active-workers pool)Returns the number of worker threads currently executing a task.
Returns the number of worker threads currently executing a task.
(create-pool job-system task-registry emitter)(create-pool job-system task-registry emitter config)Creates a worker pool. Call start! to begin processing. job-system is {:pool datasource :validator validator}. task-registry is a map of task-identifier -> handler fn. emitter is a monitoring/Emitter. config keys: :concurrency, :poll-interval-ms, :queue-size, :queue-ttl-ms, :graceful-shutdown-timeout-ms, :task-identifiers, :forbidden-flags.
Creates a worker pool. Call start! to begin processing.
job-system is {:pool datasource :validator validator}.
task-registry is a map of task-identifier -> handler fn.
emitter is a monitoring/Emitter.
config keys: :concurrency, :poll-interval-ms, :queue-size, :queue-ttl-ms,
:graceful-shutdown-timeout-ms, :task-identifiers, :forbidden-flags.(force-stop! pool)Immediately interrupts all worker threads and stops the pool. Returns pool.
Immediately interrupts all worker threads and stops the pool. Returns pool.
(partial-success partial-results)Wraps partial-results so the pool treats the task return as a partial success. partial-results keys: :completed-steps, :failed-steps, :retry-from-step, :results.
Wraps partial-results so the pool treats the task return as a partial success. partial-results keys: :completed-steps, :failed-steps, :retry-from-step, :results.
(partial-success? v)Returns true if v was produced by partial-success.
Returns true if v was produced by partial-success.
(running? pool)Returns true if the pool has been started and not yet stopped.
Returns true if the pool has been started and not yet stopped.
(start! pool)Starts the pool and all worker threads. Throws if already running. Returns pool.
Starts the pool and all worker threads. Throws if already running. Returns pool.
(stats pool)Returns a snapshot of pool operational metrics. Keys: :active, :completed, :failed, :errors.
Returns a snapshot of pool operational metrics. Keys: :active, :completed, :failed, :errors.
(stop! pool)(stop! pool timeout-ms)Gracefully stops the pool, waiting up to timeout-ms for in-flight jobs to finish. Jobs buffered but not yet dispatched remain locked in the DB; they are recovered by ResetOverdueJobs after config.lock_timeout. Returns pool.
Gracefully stops the pool, waiting up to timeout-ms for in-flight jobs to finish. Jobs buffered but not yet dispatched remain locked in the DB; they are recovered by ResetOverdueJobs after config.lock_timeout. Returns pool.
(worker-id pool)Returns the worker-id used for all job claims from this pool.
Returns the worker-id used for all job claims from this pool.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |