Local in-process job queue with background polling from job-manager.
The queue batches job claims from the database into a local buffer and distributes them to workers via take-job!. A background daemon thread polls job-manager continuously, refetching whenever the buffer depth drops below the configured threshold.
Typical usage (worker-pool calls create-queue once per pool):
(def q (-> (queue/create-queue sys worker-id {:size 50}) queue/start!))
;; Worker thread loop (when-let [job (queue/take-job! q 2000)] (try (execute job) (job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed) (catch Exception e (job-manager/fail-jobs sys (queue/worker-id q) [{:job job :error e}] elapsed))))
;; On shutdown (let [undispatched (queue/stop! q)] ;; undispatched jobs remain locked in DB; they will be recovered ;; by the ResetOverdueJobs rule after config.lock_timeout.
Local in-process job queue with background polling from job-manager.
The queue batches job claims from the database into a local buffer and
distributes them to workers via take-job!. A background daemon thread
polls job-manager continuously, refetching whenever the buffer depth
drops below the configured threshold.
Typical usage (worker-pool calls create-queue once per pool):
(def q (-> (queue/create-queue sys worker-id {:size 50})
queue/start!))
;; Worker thread loop
(when-let [job (queue/take-job! q 2000)]
(try
(execute job)
(job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed)
(catch Exception e
(job-manager/fail-jobs sys (queue/worker-id q)
[{:job job :error e}] elapsed))))
;; On shutdown
(let [undispatched (queue/stop! q)]
;; undispatched jobs remain locked in DB; they will be recovered
;; by the ResetOverdueJobs rule after config.lock_timeout.(create-queue job-system worker-id)(create-queue job-system worker-id config)Creates a local queue. Call start! to begin polling. config keys: :size, :ttl-ms, :refetch-threshold, :poll-interval-ms, :refetch-delay-ms, :task-identifiers, :forbidden-flags.
Creates a local queue. Call start! to begin polling. config keys: :size, :ttl-ms, :refetch-threshold, :poll-interval-ms, :refetch-delay-ms, :task-identifiers, :forbidden-flags.
(depth queue)Returns the number of jobs currently buffered locally.
Returns the number of jobs currently buffered locally.
(offer-jobs! queue jobs)Places jobs directly into the buffer without DB interaction. Jobs inserted this way bypass history tracking; the correlation-id-key required by complete-jobs and fail-jobs is not set, so job_history rows will not be updated from 'started' status when these jobs complete or fail. Use for testing only, or for re-dispatch where history tracking is not required.
Places jobs directly into the buffer without DB interaction. Jobs inserted this way bypass history tracking; the correlation-id-key required by complete-jobs and fail-jobs is not set, so job_history rows will not be updated from 'started' status when these jobs complete or fail. Use for testing only, or for re-dispatch where history tracking is not required.
(running? queue)Returns true if the background polling loop is active.
Returns true if the background polling loop is active.
(start! queue)Starts the background polling loop. Throws if already running. Returns queue.
Starts the background polling loop. Throws if already running. Returns queue.
(stats queue)Returns a snapshot of queue operational metrics. Keys: :fetched, :dispatched, :stale-dropped, :refetch-count, :errors.
Returns a snapshot of queue operational metrics. Keys: :fetched, :dispatched, :stale-dropped, :refetch-count, :errors.
(stop! queue)(stop! queue join-timeout-ms)Stops polling, drains the buffer, and returns un-dispatched jobs. join-timeout-ms controls how long to wait for the poll thread (default 5000).
Stops polling, drains the buffer, and returns un-dispatched jobs. join-timeout-ms controls how long to wait for the poll thread (default 5000).
(take-job! queue timeout-ms)Blocks until a job is available or timeout-ms elapses. Entries older than config :ttl-ms are discarded and not returned. Returns the job map or nil on timeout.
Blocks until a job is available or timeout-ms elapses. Entries older than config :ttl-ms are discarded and not returned. Returns the job map or nil on timeout.
(worker-id queue)Returns the worker-id used for database job locking. Pass this to job-manager/complete-jobs and job-manager/fail-jobs.
Returns the worker-id used for database job locking. Pass this to job-manager/complete-jobs and job-manager/fail-jobs.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |