Configuration management interface with malli schema metadata.
All functions have :malli/schema metadata for runtime validation. To enable instrumentation:
(require '[malli.instrument :as mi]) (mi/instrument!) ; instruments all functions with :malli/schema metadata (mi/unstrument!) ; removes instrumentation
Configuration management interface with malli schema metadata. All functions have :malli/schema metadata for runtime validation. To enable instrumentation: (require '[malli.instrument :as mi]) (mi/instrument!) ; instruments all functions with :malli/schema metadata (mi/unstrument!) ; removes instrumentation
Malli schemas for configuration validation
Malli schemas for configuration validation
Malli schemas for database component.
Malli schemas for database component.
Job execution history: database persistence and in-memory ring buffer.
The ring buffer retains the last N terminal records (completed, failed, partial_success). record-start! writes to the DB only; the terminal record-* functions write to the DB and push to the buffer. observe! adds directly to the buffer without any DB interaction - useful for tests and for wiring up monitoring-event subscribers.
Thread safety: the ring buffer state lives in an atom updated with swap!.
Job execution history: database persistence and in-memory ring buffer. The ring buffer retains the last N terminal records (completed, failed, partial_success). record-start! writes to the DB only; the terminal record-* functions write to the DB and push to the buffer. observe! adds directly to the buffer without any DB interaction - useful for tests and for wiring up monitoring-event subscribers. Thread safety: the ring buffer state lives in an atom updated with swap!.
Job execution history: DB persistence and in-memory ring buffer of recent executions.
A HistoryStore wraps a database pool and an atom-backed ring buffer. Create one store per system and share it across components that record or query job history.
Typical usage:
;; Create the store (pool from the database component) (def store (job-history/create-store pool {:buffer-size 500}))
;; In the worker execution path (job-history/record-start! store job worker-id correlation-id) ;; ... job runs ... (job-history/record-completion! store (:id job) worker-id correlation-id elapsed-ms)
;; Query DB history (job-history/get-by-job-id store job-id) (job-history/query store {:from (java.util.Date.) :status "failed" :limit 50})
;; Inspect recent executions from the ring buffer (no DB) (job-history/recent store 20)
;; Wire up to monitoring events so the ring buffer is populated without ;; routing writes through this component: (monitoring/on emitter :job/completed (fn [{:keys [data]}] (job-history/observe! store data)))
;; Scheduled maintenance (job-history/expire! store)
Ring buffer notes: record-start! writes to the DB only - started records are not buffered. record-completion!, record-failure!, and record-partial-success! write to the DB and push the terminal record into the ring buffer. observe! adds directly to the buffer without any DB interaction.
Job execution history: DB persistence and in-memory ring buffer of recent executions.
A HistoryStore wraps a database pool and an atom-backed ring buffer. Create one
store per system and share it across components that record or query job history.
Typical usage:
;; Create the store (pool from the database component)
(def store (job-history/create-store pool {:buffer-size 500}))
;; In the worker execution path
(job-history/record-start! store job worker-id correlation-id)
;; ... job runs ...
(job-history/record-completion! store (:id job) worker-id correlation-id elapsed-ms)
;; Query DB history
(job-history/get-by-job-id store job-id)
(job-history/query store {:from (java.util.Date.) :status "failed" :limit 50})
;; Inspect recent executions from the ring buffer (no DB)
(job-history/recent store 20)
;; Wire up to monitoring events so the ring buffer is populated without
;; routing writes through this component:
(monitoring/on emitter :job/completed
(fn [{:keys [data]}] (job-history/observe! store data)))
;; Scheduled maintenance
(job-history/expire! store)
Ring buffer notes:
record-start! writes to the DB only - started records are not buffered.
record-completion!, record-failure!, and record-partial-success! write to
the DB and push the terminal record into the ring buffer.
observe! adds directly to the buffer without any DB interaction.Malli schemas for the job-history component.
Malli schemas for the job-history component.
Job lifecycle management: the JobEnqueue and WorkerExecution surfaces.
The system map passed to all functions must contain: :pool - HikariCP DataSource (from the database component) :validator - PayloadValidator (from the validation component)
Typical usage:
;; Build the system (def sys {:pool pool :validator validator})
;; Enqueue (add-job sys "send-email" {:to "x@y.com"} {:queue-name "email"})
;; Worker loop (let [jobs (get-jobs sys worker-id {:batch-size 5})] (doseq [job jobs] (try (run-task! job) (complete-jobs sys worker-id [job] elapsed-ms) (catch Exception e (fail-jobs sys worker-id [{:job job :error e}] elapsed-ms)))))
Job lifecycle management: the JobEnqueue and WorkerExecution surfaces.
The system map passed to all functions must contain:
:pool - HikariCP DataSource (from the database component)
:validator - PayloadValidator (from the validation component)
Typical usage:
;; Build the system
(def sys {:pool pool :validator validator})
;; Enqueue
(add-job sys "send-email" {:to "x@y.com"} {:queue-name "email"})
;; Worker loop
(let [jobs (get-jobs sys worker-id {:batch-size 5})]
(doseq [job jobs]
(try
(run-task! job)
(complete-jobs sys worker-id [job] elapsed-ms)
(catch Exception e
(fail-jobs sys worker-id [{:job job :error e}] elapsed-ms)))))Malli schemas for job-manager component.
Malli schemas for job-manager component.
Background maintenance scheduler.
Runs two classes of work on separate cadences: Short interval (maintenance-interval-ms, default 60 s): reset-locked-jobs — frees jobs orphaned by crashed workers refill-rate-limits — refills token buckets whose window has expired
Cron schedule (schedule, default '0 3 * * *'): GC tasks listed in :tasks config (gc-task-identifiers, gc-job-queues, gc-job-history).
The GC schedule is evaluated on each maintenance tick; the first tick after a scheduled time triggers the run. Last-run time is tracked in memory and resets to nil on restart (causing GC to wait for the next scheduled slot).
Background maintenance scheduler.
Runs two classes of work on separate cadences:
Short interval (maintenance-interval-ms, default 60 s):
reset-locked-jobs — frees jobs orphaned by crashed workers
refill-rate-limits — refills token buckets whose window has expired
Cron schedule (schedule, default '0 3 * * *'):
GC tasks listed in :tasks config (gc-task-identifiers, gc-job-queues,
gc-job-history).
The GC schedule is evaluated on each maintenance tick; the first tick
after a scheduled time triggers the run. Last-run time is tracked in memory
and resets to nil on restart (causing GC to wait for the next scheduled slot).Maintenance scheduler: reset-locked-jobs and refill-rate-limits run on a short interval; GC tasks run on a configurable cron schedule.
Maintenance scheduler: reset-locked-jobs and refill-rate-limits run on a short interval; GC tasks run on a configurable cron schedule.
Malli schemas for the maintenance component.
Malli schemas for the maintenance component.
Malli schemas for migration component.
Malli schemas for migration component.
Event emitter: fan-out delivery to registered handlers with a ring buffer.
Handlers registered with :all receive every event regardless of type. Handlers registered with a specific keyword receive only that event type. Handler errors are caught, counted in :dropped stats, and never propagate.
Thread safety: all mutable state lives in atoms updated with swap!/reset!. emit! and handler registration are safe to call concurrently.
Event emitter: fan-out delivery to registered handlers with a ring buffer. Handlers registered with :all receive every event regardless of type. Handlers registered with a specific keyword receive only that event type. Handler errors are caught, counted in :dropped stats, and never propagate. Thread safety: all mutable state lives in atoms updated with swap!/reset!. emit! and handler registration are safe to call concurrently.
Event emitter with fan-out delivery, a ring buffer, and operational stats.
Emitters are plain maps with atom-backed state. Create one per system and pass it to components that need to emit events (worker-pool, queue, etc.).
Typical usage:
;; Create from the :events section of MonitoringConfig (def emitter (monitoring/create-emitter {:enabled true :buffer-size 500}))
;; Register a structured logging handler for all events (monitoring/on emitter :all (fn [{:keys [type data]}] (log/info "event" (assoc data :event/type type))))
;; Register a handler for a specific event type (def hid (monitoring/on emitter :job/claimed (fn [event] (metrics/increment! :jobs-claimed))))
;; Emit from a component (monitoring/emit! emitter :job/claimed {:job-id id :worker-id wid})
;; Deregister when no longer needed (monitoring/off emitter hid)
;; Inspect recent events (ring buffer snapshot) (monitoring/events emitter)
;; In tests: use noop-emitter to silence all monitoring (monitoring/noop-emitter)
;; In tests: use collecting-emitter to assert events were emitted (let [em (monitoring/collecting-emitter)] (do-something em) (is (some #(= :job/claimed (:type %)) (monitoring/events em))))
Standard event types emitted by skivi components: :job/claimed - worker claimed a job from the database :job/completed - worker completed a job successfully :job/failed - worker failed a job (retry eligible) :job/exhausted - worker failed a job at max_attempts :job/partial-success - worker reported partial success :queue/locked - a named job queue was locked by a worker :queue/unlocked - a named job queue was released :cron/fired - a cron tab entry scheduled a job :worker/error - an unhandled exception occurred in a worker :pool/start - the worker pool started :pool/stop - the worker pool stopped
Event emitter with fan-out delivery, a ring buffer, and operational stats.
Emitters are plain maps with atom-backed state. Create one per system
and pass it to components that need to emit events (worker-pool, queue, etc.).
Typical usage:
;; Create from the :events section of MonitoringConfig
(def emitter (monitoring/create-emitter {:enabled true :buffer-size 500}))
;; Register a structured logging handler for all events
(monitoring/on emitter :all
(fn [{:keys [type data]}]
(log/info "event" (assoc data :event/type type))))
;; Register a handler for a specific event type
(def hid (monitoring/on emitter :job/claimed
(fn [event] (metrics/increment! :jobs-claimed))))
;; Emit from a component
(monitoring/emit! emitter :job/claimed {:job-id id :worker-id wid})
;; Deregister when no longer needed
(monitoring/off emitter hid)
;; Inspect recent events (ring buffer snapshot)
(monitoring/events emitter)
;; In tests: use noop-emitter to silence all monitoring
(monitoring/noop-emitter)
;; In tests: use collecting-emitter to assert events were emitted
(let [em (monitoring/collecting-emitter)]
(do-something em)
(is (some #(= :job/claimed (:type %)) (monitoring/events em))))
Standard event types emitted by skivi components:
:job/claimed - worker claimed a job from the database
:job/completed - worker completed a job successfully
:job/failed - worker failed a job (retry eligible)
:job/exhausted - worker failed a job at max_attempts
:job/partial-success - worker reported partial success
:queue/locked - a named job queue was locked by a worker
:queue/unlocked - a named job queue was released
:cron/fired - a cron tab entry scheduled a job
:worker/error - an unhandled exception occurred in a worker
:pool/start - the worker pool started
:pool/stop - the worker pool stoppedMalli schemas for the monitoring component.
Malli schemas for the monitoring component.
In-process job buffer with background polling from job-manager.
Jobs are claimed from the database in batches and held in a LinkedBlockingQueue until a worker calls take-job!. The poll loop runs as a daemon thread and automatically refetches when the buffer depth drops below the configured threshold.
In-process job buffer with background polling from job-manager. Jobs are claimed from the database in batches and held in a LinkedBlockingQueue until a worker calls take-job!. The poll loop runs as a daemon thread and automatically refetches when the buffer depth drops below the configured threshold.
Local in-process job queue with background polling from job-manager.
The queue batches job claims from the database into a local buffer and distributes them to workers via take-job!. A background daemon thread polls job-manager continuously, refetching whenever the buffer depth drops below the configured threshold.
Typical usage (worker-pool calls create-queue once per pool):
(def q (-> (queue/create-queue sys worker-id {:size 50}) queue/start!))
;; Worker thread loop (when-let [job (queue/take-job! q 2000)] (try (execute job) (job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed) (catch Exception e (job-manager/fail-jobs sys (queue/worker-id q) [{:job job :error e}] elapsed))))
;; On shutdown (let [undispatched (queue/stop! q)] ;; undispatched jobs remain locked in DB; they will be recovered ;; by the ResetOverdueJobs rule after config.lock_timeout.
Local in-process job queue with background polling from job-manager.
The queue batches job claims from the database into a local buffer and
distributes them to workers via take-job!. A background daemon thread
polls job-manager continuously, refetching whenever the buffer depth
drops below the configured threshold.
Typical usage (worker-pool calls create-queue once per pool):
(def q (-> (queue/create-queue sys worker-id {:size 50})
queue/start!))
;; Worker thread loop
(when-let [job (queue/take-job! q 2000)]
(try
(execute job)
(job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed)
(catch Exception e
(job-manager/fail-jobs sys (queue/worker-id q)
[{:job job :error e}] elapsed))))
;; On shutdown
(let [undispatched (queue/stop! q)]
;; undispatched jobs remain locked in DB; they will be recovered
;; by the ResetOverdueJobs rule after config.lock_timeout.Malli schemas for queue component.
Malli schemas for queue component.
Cron scheduler: evaluates crontab entries and enqueues jobs when due.
Cron state (last_execution) is persisted to known_crontabs so that the scheduler survives process restarts without double-firing. Jobs are enqueued with job_key_mode = unsafe_dedupe, so multiple scheduler instances running in parallel will not produce duplicate queue entries.
The background polling thread evaluates all registered entries on each tick and fires any whose next_run_at <= now. The poll interval is coarse (default 10 s) because cron granularity is one minute.
Relies on the ReadableColumn protocol extensions defined in database.core to return java.time.Instant values for TIMESTAMPTZ columns.
Cron scheduler: evaluates crontab entries and enqueues jobs when due. Cron state (last_execution) is persisted to known_crontabs so that the scheduler survives process restarts without double-firing. Jobs are enqueued with job_key_mode = unsafe_dedupe, so multiple scheduler instances running in parallel will not produce duplicate queue entries. The background polling thread evaluates all registered entries on each tick and fires any whose next_run_at <= now. The poll interval is coarse (default 10 s) because cron granularity is one minute. Relies on the ReadableColumn protocol extensions defined in database.core to return java.time.Instant values for TIMESTAMPTZ columns.
Cron scheduler that enqueues jobs via job-manager when schedules are due.
Each cron entry has an identifier (used as both task-identifier and job-key), a standard UNIX 5-field cron expression, and optional per-entry job creation overrides. Cron state (last_execution) is persisted to skivi.known_crontabs so the scheduler survives restarts without double-firing.
Jobs are enqueued with job_key_mode = unsafe_dedupe: if the previous run's job is still queued or in flight, the new enqueue is a no-op.
Typical usage:
(def crontabs [{:identifier "daily-report" :schedule "0 2 * * *" :spec {:queue-name "maintenance" :max-attempts 3}} {:identifier "hourly-cleanup" :schedule "0 * * * *"}])
(def scheduler (-> (scheduler/create-scheduler sys crontabs emitter {:timezone "Europe/London"}) scheduler/start!))
;; On application shutdown (scheduler/stop! scheduler 5000)
Standard events emitted: :cron/fired - a cron entry fired and a job was enqueued; :identifier in data
Cron scheduler that enqueues jobs via job-manager when schedules are due.
Each cron entry has an identifier (used as both task-identifier and job-key),
a standard UNIX 5-field cron expression, and optional per-entry job creation
overrides. Cron state (last_execution) is persisted to skivi.known_crontabs
so the scheduler survives restarts without double-firing.
Jobs are enqueued with job_key_mode = unsafe_dedupe: if the previous run's
job is still queued or in flight, the new enqueue is a no-op.
Typical usage:
(def crontabs
[{:identifier "daily-report"
:schedule "0 2 * * *"
:spec {:queue-name "maintenance" :max-attempts 3}}
{:identifier "hourly-cleanup"
:schedule "0 * * * *"}])
(def scheduler
(-> (scheduler/create-scheduler sys crontabs emitter {:timezone "Europe/London"})
scheduler/start!))
;; On application shutdown
(scheduler/stop! scheduler 5000)
Standard events emitted:
:cron/fired - a cron entry fired and a job was enqueued; :identifier in dataMalli schemas for the scheduler component.
Malli schemas for the scheduler component.
Unified entry point that wires all skivi components into a single system map.
Typical usage:
(def system (-> (skivi/create-system config {"send-email" handle-send-email}) (skivi/start!)))
(skivi/add-job system "send-email" {:to "x@y.com"})
;; On shutdown (skivi/stop! system)
Unified entry point that wires all skivi components into a single system map.
Typical usage:
(def system
(-> (skivi/create-system config {"send-email" handle-send-email})
(skivi/start!)))
(skivi/add-job system "send-email" {:to "x@y.com"})
;; On shutdown
(skivi/stop! system)Job payload validation interface supporting malli schemas, clojure.spec specs, and composite chains of validators.
Typical usage:
;; From config (malli schemas or clojure.spec keywords in [:schema :job-schemas]) (def v (create-validator config))
;; Directly (def v (malli-validator {:send-email [:map [:to :string] [:subject :string]]}))
;; Validate (validate-payload v :send-email {:to "x@y.com" :subject "Hi"})
To enable malli instrumentation of this namespace:
(require '[malli.instrument :as mi]) (mi/instrument!) (mi/unstrument!)
Job payload validation interface supporting malli schemas, clojure.spec specs,
and composite chains of validators.
Typical usage:
;; From config (malli schemas or clojure.spec keywords in [:schema :job-schemas])
(def v (create-validator config))
;; Directly
(def v (malli-validator {:send-email [:map [:to :string] [:subject :string]]}))
;; Validate
(validate-payload v :send-email {:to "x@y.com" :subject "Hi"})
To enable malli instrumentation of this namespace:
(require '[malli.instrument :as mi])
(mi/instrument!)
(mi/unstrument!)Thread-pool-based worker pool that claims jobs from the queue and dispatches them to registered task functions.
All worker threads share one queue and one worker-id, so jobs are claimed atomically in batches and distributed via take-job!. The pool does not own the job-system or emitter; callers supply those at creation time.
Task functions receive a context map with :job, :job-system, and :worker-id. They signal outcomes as follows: • return any value (including nil) - job succeeds (WorkerCompletesJob) • return (partial-success m) - partial success (WorkerReportsPartialSuccess) • throw any Throwable - job fails (WorkerFailsJob)
Events emitted: :pool/start - pool is started (includes :concurrency) :pool/stop - pool has stopped (includes :forced? when force-stop!) :job/completed - task fn returned normally :job/failed - task fn threw; job retry scheduled :job/exhausted - task fn threw on the final allowed attempt :job/partial-success - task fn returned a partial-success value :worker/error - infrastructure exception outside task execution
Thread-pool-based worker pool that claims jobs from the queue and dispatches them to registered task functions. All worker threads share one queue and one worker-id, so jobs are claimed atomically in batches and distributed via take-job!. The pool does not own the job-system or emitter; callers supply those at creation time. Task functions receive a context map with :job, :job-system, and :worker-id. They signal outcomes as follows: • return any value (including nil) - job succeeds (WorkerCompletesJob) • return (partial-success m) - partial success (WorkerReportsPartialSuccess) • throw any Throwable - job fails (WorkerFailsJob) Events emitted: :pool/start - pool is started (includes :concurrency) :pool/stop - pool has stopped (includes :forced? when force-stop!) :job/completed - task fn returned normally :job/failed - task fn threw; job retry scheduled :job/exhausted - task fn threw on the final allowed attempt :job/partial-success - task fn returned a partial-success value :worker/error - infrastructure exception outside task execution
Concurrent worker pool that claims jobs from the queue and executes task handlers.
All workers share a single local queue backed by job-manager. The pool uses one worker-id for all job claims; see worker-id to retrieve it.
Task functions are plain Clojure functions keyed by task-identifier string:
(def tasks {"send-email" (fn [{:keys [job]}] (send! (:payload job))) "resize-image" (fn [{:keys [job job-system worker-id]}] ...)})
A task fn receives a context map with: :job - the full job map returned by job-manager/get-jobs :job-system - the {:pool :validator} system map :worker-id - the pool's worker-id string
Task outcome is determined by the return value or exception: • return any value (including nil) - job completes successfully • return (partial-success m) - job reported as partial success • throw any Throwable - job is failed (retry or exhaust)
Typical usage:
(def pool (-> (worker-pool/create-pool sys tasks emitter {:concurrency 4}) worker-pool/start!))
;; On application shutdown (worker-pool/stop! pool 15000)
Standard events emitted by the pool (via the supplied emitter): :pool/start - pool started; :concurrency in data :pool/stop - pool stopped; :forced? in data :job/completed - task fn returned normally :job/failed - task fn threw; retry scheduled :job/exhausted - task fn threw on the final allowed attempt :job/partial-success - task fn returned a partial-success value :worker/error - infrastructure exception outside task execution
Concurrent worker pool that claims jobs from the queue and executes task handlers.
All workers share a single local queue backed by job-manager. The pool uses one
worker-id for all job claims; see worker-id to retrieve it.
Task functions are plain Clojure functions keyed by task-identifier string:
(def tasks
{"send-email" (fn [{:keys [job]}] (send! (:payload job)))
"resize-image" (fn [{:keys [job job-system worker-id]}] ...)})
A task fn receives a context map with:
:job - the full job map returned by job-manager/get-jobs
:job-system - the {:pool :validator} system map
:worker-id - the pool's worker-id string
Task outcome is determined by the return value or exception:
• return any value (including nil) - job completes successfully
• return (partial-success m) - job reported as partial success
• throw any Throwable - job is failed (retry or exhaust)
Typical usage:
(def pool
(-> (worker-pool/create-pool sys tasks emitter {:concurrency 4})
worker-pool/start!))
;; On application shutdown
(worker-pool/stop! pool 15000)
Standard events emitted by the pool (via the supplied emitter):
:pool/start - pool started; :concurrency in data
:pool/stop - pool stopped; :forced? in data
:job/completed - task fn returned normally
:job/failed - task fn threw; retry scheduled
:job/exhausted - task fn threw on the final allowed attempt
:job/partial-success - task fn returned a partial-success value
:worker/error - infrastructure exception outside task executionMalli schemas for the worker-pool component.
Malli schemas for the worker-pool component.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |