In-process job buffer with background polling from job-manager.
Jobs are claimed from the database in batches and held in a LinkedBlockingQueue until a worker calls take-job!. The poll loop runs as a daemon thread and automatically refetches when the buffer depth drops below the configured threshold.
In-process job buffer with background polling from job-manager. Jobs are claimed from the database in batches and held in a LinkedBlockingQueue until a worker calls take-job!. The poll loop runs as a daemon thread and automatically refetches when the buffer depth drops below the configured threshold.
Local in-process job queue with background polling from job-manager.
The queue batches job claims from the database into a local buffer and distributes them to workers via take-job!. A background daemon thread polls job-manager continuously, refetching whenever the buffer depth drops below the configured threshold.
Typical usage (worker-pool calls create-queue once per pool):
(def q (-> (queue/create-queue sys worker-id {:size 50}) queue/start!))
;; Worker thread loop (when-let [job (queue/take-job! q 2000)] (try (execute job) (job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed) (catch Exception e (job-manager/fail-jobs sys (queue/worker-id q) [{:job job :error e}] elapsed))))
;; On shutdown (let [undispatched (queue/stop! q)] ;; undispatched jobs remain locked in DB; they will be recovered ;; by the ResetOverdueJobs rule after config.lock_timeout.
Local in-process job queue with background polling from job-manager.
The queue batches job claims from the database into a local buffer and
distributes them to workers via take-job!. A background daemon thread
polls job-manager continuously, refetching whenever the buffer depth
drops below the configured threshold.
Typical usage (worker-pool calls create-queue once per pool):
(def q (-> (queue/create-queue sys worker-id {:size 50})
queue/start!))
;; Worker thread loop
(when-let [job (queue/take-job! q 2000)]
(try
(execute job)
(job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed)
(catch Exception e
(job-manager/fail-jobs sys (queue/worker-id q)
[{:job job :error e}] elapsed))))
;; On shutdown
(let [undispatched (queue/stop! q)]
;; undispatched jobs remain locked in DB; they will be recovered
;; by the ResetOverdueJobs rule after config.lock_timeout.Malli schemas for queue component.
Malli schemas for queue component.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |