Job lifecycle management: the JobEnqueue and WorkerExecution surfaces.
The system map passed to all functions must contain: :pool - HikariCP DataSource (from the database component) :validator - PayloadValidator (from the validation component)
Typical usage:
;; Build the system (def sys {:pool pool :validator validator})
;; Enqueue (add-job sys "send-email" {:to "x@y.com"} {:queue-name "email"})
;; Worker loop (let [jobs (get-jobs sys worker-id {:batch-size 5})] (doseq [job jobs] (try (run-task! job) (complete-jobs sys worker-id [job] elapsed-ms) (catch Exception e (fail-jobs sys worker-id [{:job job :error e}] elapsed-ms)))))
Job lifecycle management: the JobEnqueue and WorkerExecution surfaces.
The system map passed to all functions must contain:
:pool - HikariCP DataSource (from the database component)
:validator - PayloadValidator (from the validation component)
Typical usage:
;; Build the system
(def sys {:pool pool :validator validator})
;; Enqueue
(add-job sys "send-email" {:to "x@y.com"} {:queue-name "email"})
;; Worker loop
(let [jobs (get-jobs sys worker-id {:batch-size 5})]
(doseq [job jobs]
(try
(run-task! job)
(complete-jobs sys worker-id [job] elapsed-ms)
(catch Exception e
(fail-jobs sys worker-id [{:job job :error e}] elapsed-ms)))))(add-job system task-identifier payload)(add-job system task-identifier payload opts)Validates payload then adds single job to queue. Returns the created job.
Validates payload then adds single job to queue. Returns the created job.
(add-jobs system job-specs)Validates all payloads then adds multiple jobs atomically. Returns created jobs.
Validates all payloads then adds multiple jobs atomically. Returns created jobs.
(complete-jobs system worker-id jobs execution-time-ms)Marks jobs as completed, removes them from the queue, and updates history. jobs must be the enriched maps returned by get-jobs. execution-time-ms is the wall-clock duration of job execution in milliseconds.
Marks jobs as completed, removes them from the queue, and updates history. jobs must be the enriched maps returned by get-jobs. execution-time-ms is the wall-clock duration of job execution in milliseconds.
Key embedded in job maps returned by get-jobs. Contains the UUID correlating the job execution to its history record. Pass the entire job map to complete-jobs, fail-jobs, or report-partial-success - do not inspect this key directly.
Key embedded in job maps returned by get-jobs. Contains the UUID correlating the job execution to its history record. Pass the entire job map to complete-jobs, fail-jobs, or report-partial-success - do not inspect this key directly.
(fail-jobs system worker-id job-errors execution-time-ms)Records job failures, schedules retries with exponential backoff, and updates history. job-errors is a sequence of {:job job :error throwable-or-string} maps. jobs must be the enriched maps returned by get-jobs.
Records job failures, schedules retries with exponential backoff, and updates
history. job-errors is a sequence of {:job job :error throwable-or-string} maps.
jobs must be the enriched maps returned by get-jobs.(force-unlock-jobs system)(force-unlock-jobs system worker-ids)Clears locks on locked jobs. With no worker-ids, unlocks all locked jobs. With worker-ids, unlocks only jobs held by those workers.
Clears locks on locked jobs. With no worker-ids, unlocks all locked jobs. With worker-ids, unlocks only jobs held by those workers.
(force-unlock-queues system)(force-unlock-queues system queue-names)Clears locks on job queues. With no queue-names, unlocks all locked queues. With queue-names, unlocks only those queues.
Clears locks on job queues. With no queue-names, unlocks all locked queues. With queue-names, unlocks only those queues.
(gc-job-history system)Removes expired job history records. Returns count of deleted rows.
Removes expired job history records. Returns count of deleted rows.
(gc-job-queues system)Removes empty, unlocked job queues. Returns count of deleted queues.
Removes empty, unlocked job queues. Returns count of deleted queues.
(gc-task-identifiers system)(gc-task-identifiers system opts)Removes task identifiers not referenced by any job and older than the retention period. Returns count of deleted rows.
Removes task identifiers not referenced by any job and older than the retention period. Returns count of deleted rows.
(get-jobs system worker-id)(get-jobs system worker-id opts)Retrieves and locks available jobs for worker-id. Each returned job map contains a ::correlation-id key required by complete-jobs, fail-jobs, and report-partial-success to update history records.
opts keys: :task-identifiers - restrict to these task types :forbidden-flags - skip jobs carrying any of these flags :batch-size - max jobs to claim (default 1) :history-retention - SQL interval string for history expiry (default '30 days')
Retrieves and locks available jobs for worker-id. Each returned job map contains a ::correlation-id key required by complete-jobs, fail-jobs, and report-partial-success to update history records. opts keys: :task-identifiers - restrict to these task types :forbidden-flags - skip jobs carrying any of these flags :batch-size - max jobs to claim (default 1) :history-retention - SQL interval string for history expiry (default '30 days')
(permanently-fail-jobs system job-ids reason)Sets jobs to exhausted status and records reason as last-error. Unlocks any named queues held by the affected jobs.
Sets jobs to exhausted status and records reason as last-error. Unlocks any named queues held by the affected jobs.
(refill-rate-limits system)Refills tokens for all rate limits whose window has expired.
Refills tokens for all rate limits whose window has expired.
(register-rate-limit system key capacity interval)Registers or updates a rate limit. capacity is max tokens per interval. interval is a SQL interval string (e.g. "1 minute").
Registers or updates a rate limit. capacity is max tokens per interval. interval is a SQL interval string (e.g. "1 minute").
(replay-failed-jobs system criteria)Creates new jobs from failed history records matching criteria map. Supports :from, :to (instants), :task-identifier, and :max-attempts keys.
Creates new jobs from failed history records matching criteria map. Supports :from, :to (instants), :task-identifier, and :max-attempts keys.
(report-partial-success system worker-id job partial-results execution-time-ms)Records partial job success: reschedules the job with exponential backoff and records partial_success status in history. job must be the enriched map returned by get-jobs.
Records partial job success: reschedules the job with exponential backoff and records partial_success status in history. job must be the enriched map returned by get-jobs.
(reschedule-jobs system job-ids opts)Updates run-at, priority, or max-attempts for the given job-ids.
Updates run-at, priority, or max-attempts for the given job-ids.
(reset-locked-jobs system)(reset-locked-jobs system opts)Resets jobs whose locks have exceeded the configurable timeout. Returns count of reset jobs.
Resets jobs whose locks have exceeded the configurable timeout. Returns count of reset jobs.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |