Liking cljdoc? Tell your friends :D

dev.skivi.queue.interface

Local in-process job queue with background polling from job-manager.

The queue batches job claims from the database into a local buffer and distributes them to workers via take-job!. A background daemon thread polls job-manager continuously, refetching whenever the buffer depth drops below the configured threshold.

Typical usage (worker-pool calls create-queue once per pool):

(def q (-> (queue/create-queue sys worker-id {:size 50}) queue/start!))

;; Worker thread loop (when-let [job (queue/take-job! q 2000)] (try (execute job) (job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed) (catch Exception e (job-manager/fail-jobs sys (queue/worker-id q) [{:job job :error e}] elapsed))))

;; On shutdown (let [undispatched (queue/stop! q)] ;; undispatched jobs remain locked in DB; they will be recovered ;; by the ResetOverdueJobs rule after config.lock_timeout.

Local in-process job queue with background polling from job-manager.

The queue batches job claims from the database into a local buffer and
distributes them to workers via take-job!. A background daemon thread
polls job-manager continuously, refetching whenever the buffer depth
drops below the configured threshold.

Typical usage (worker-pool calls create-queue once per pool):

  (def q (-> (queue/create-queue sys worker-id {:size 50})
             queue/start!))

  ;; Worker thread loop
  (when-let [job (queue/take-job! q 2000)]
    (try
      (execute job)
      (job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed)
      (catch Exception e
        (job-manager/fail-jobs sys (queue/worker-id q)
                               [{:job job :error e}] elapsed))))

  ;; On shutdown
  (let [undispatched (queue/stop! q)]
    ;; undispatched jobs remain locked in DB; they will be recovered
    ;; by the ResetOverdueJobs rule after config.lock_timeout.
raw docstring

create-queueclj

(create-queue job-system worker-id)
(create-queue job-system worker-id config)

Creates a local queue. Call start! to begin polling. config keys: :size, :ttl-ms, :refetch-threshold, :poll-interval-ms, :refetch-delay-ms, :task-identifiers, :forbidden-flags.

Creates a local queue. Call start! to begin polling.
config keys: :size, :ttl-ms, :refetch-threshold, :poll-interval-ms,
:refetch-delay-ms, :task-identifiers, :forbidden-flags.
sourceraw docstring

depthclj

(depth queue)

Returns the number of jobs currently buffered locally.

Returns the number of jobs currently buffered locally.
sourceraw docstring

offer-jobs!clj

(offer-jobs! queue jobs)

Places jobs directly into the buffer without DB interaction. Jobs inserted this way bypass history tracking; the correlation-id-key required by complete-jobs and fail-jobs is not set, so job_history rows will not be updated from 'started' status when these jobs complete or fail. Use for testing only, or for re-dispatch where history tracking is not required.

Places jobs directly into the buffer without DB interaction.
Jobs inserted this way bypass history tracking; the correlation-id-key
required by complete-jobs and fail-jobs is not set, so job_history rows
will not be updated from 'started' status when these jobs complete or fail.
Use for testing only, or for re-dispatch where history tracking is not required.
sourceraw docstring

running?clj

(running? queue)

Returns true if the background polling loop is active.

Returns true if the background polling loop is active.
sourceraw docstring

start!clj

(start! queue)

Starts the background polling loop. Throws if already running. Returns queue.

Starts the background polling loop. Throws if already running. Returns queue.
sourceraw docstring

statsclj

(stats queue)

Returns a snapshot of queue operational metrics. Keys: :fetched, :dispatched, :stale-dropped, :refetch-count, :errors.

Returns a snapshot of queue operational metrics.
Keys: :fetched, :dispatched, :stale-dropped, :refetch-count, :errors.
sourceraw docstring

stop!clj

(stop! queue)
(stop! queue join-timeout-ms)

Stops polling, drains the buffer, and returns un-dispatched jobs. join-timeout-ms controls how long to wait for the poll thread (default 5000).

Stops polling, drains the buffer, and returns un-dispatched jobs.
join-timeout-ms controls how long to wait for the poll thread (default 5000).
sourceraw docstring

take-job!clj

(take-job! queue timeout-ms)

Blocks until a job is available or timeout-ms elapses. Entries older than config :ttl-ms are discarded and not returned. Returns the job map or nil on timeout.

Blocks until a job is available or timeout-ms elapses.
Entries older than config :ttl-ms are discarded and not returned.
Returns the job map or nil on timeout.
sourceraw docstring

worker-idclj

(worker-id queue)

Returns the worker-id used for database job locking. Pass this to job-manager/complete-jobs and job-manager/fail-jobs.

Returns the worker-id used for database job locking.
Pass this to job-manager/complete-jobs and job-manager/fail-jobs.
sourceraw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close