Liking cljdoc? Tell your friends :D

dev.skivi.config.interface

Configuration management interface with malli schema metadata.

All functions have :malli/schema metadata for runtime validation. To enable instrumentation:

(require '[malli.instrument :as mi]) (mi/instrument!) ; instruments all functions with :malli/schema metadata (mi/unstrument!) ; removes instrumentation

Configuration management interface with malli schema metadata.

All functions have :malli/schema metadata for runtime validation.
To enable instrumentation:

  (require '[malli.instrument :as mi])
  (mi/instrument!)  ; instruments all functions with :malli/schema metadata
  (mi/unstrument!)  ; removes instrumentation
raw docstring

dev.skivi.database.schema

Malli schemas for database component.

Malli schemas for database component.
raw docstring

dev.skivi.job-history.core

Job execution history: database persistence and in-memory ring buffer.

The ring buffer retains the last N terminal records (completed, failed, partial_success). record-start! writes to the DB only; the terminal record-* functions write to the DB and push to the buffer. observe! adds directly to the buffer without any DB interaction - useful for tests and for wiring up monitoring-event subscribers.

Thread safety: the ring buffer state lives in an atom updated with swap!.

Job execution history: database persistence and in-memory ring buffer.

The ring buffer retains the last N terminal records (completed, failed,
partial_success). record-start! writes to the DB only; the terminal
record-* functions write to the DB and push to the buffer. observe! adds
directly to the buffer without any DB interaction - useful for tests and
for wiring up monitoring-event subscribers.

Thread safety: the ring buffer state lives in an atom updated with swap!.
raw docstring

dev.skivi.job-history.interface

Job execution history: DB persistence and in-memory ring buffer of recent executions.

A HistoryStore wraps a database pool and an atom-backed ring buffer. Create one store per system and share it across components that record or query job history.

Typical usage:

;; Create the store (pool from the database component) (def store (job-history/create-store pool {:buffer-size 500}))

;; In the worker execution path (job-history/record-start! store job worker-id correlation-id) ;; ... job runs ... (job-history/record-completion! store (:id job) worker-id correlation-id elapsed-ms)

;; Query DB history (job-history/get-by-job-id store job-id) (job-history/query store {:from (java.util.Date.) :status "failed" :limit 50})

;; Inspect recent executions from the ring buffer (no DB) (job-history/recent store 20)

;; Wire up to monitoring events so the ring buffer is populated without ;; routing writes through this component: (monitoring/on emitter :job/completed (fn [{:keys [data]}] (job-history/observe! store data)))

;; Scheduled maintenance (job-history/expire! store)

Ring buffer notes: record-start! writes to the DB only - started records are not buffered. record-completion!, record-failure!, and record-partial-success! write to the DB and push the terminal record into the ring buffer. observe! adds directly to the buffer without any DB interaction.

Job execution history: DB persistence and in-memory ring buffer of recent executions.

A HistoryStore wraps a database pool and an atom-backed ring buffer. Create one
store per system and share it across components that record or query job history.

Typical usage:

  ;; Create the store (pool from the database component)
  (def store (job-history/create-store pool {:buffer-size 500}))

  ;; In the worker execution path
  (job-history/record-start! store job worker-id correlation-id)
  ;; ... job runs ...
  (job-history/record-completion! store (:id job) worker-id correlation-id elapsed-ms)

  ;; Query DB history
  (job-history/get-by-job-id store job-id)
  (job-history/query store {:from (java.util.Date.) :status "failed" :limit 50})

  ;; Inspect recent executions from the ring buffer (no DB)
  (job-history/recent store 20)

  ;; Wire up to monitoring events so the ring buffer is populated without
  ;; routing writes through this component:
  (monitoring/on emitter :job/completed
                 (fn [{:keys [data]}] (job-history/observe! store data)))

  ;; Scheduled maintenance
  (job-history/expire! store)

Ring buffer notes:
  record-start! writes to the DB only - started records are not buffered.
  record-completion!, record-failure!, and record-partial-success! write to
  the DB and push the terminal record into the ring buffer.
  observe! adds directly to the buffer without any DB interaction.
raw docstring

dev.skivi.job-history.schema

Malli schemas for the job-history component.

Malli schemas for the job-history component.
raw docstring

dev.skivi.job-manager.interface

Job lifecycle management: the JobEnqueue and WorkerExecution surfaces.

The system map passed to all functions must contain: :pool - HikariCP DataSource (from the database component) :validator - PayloadValidator (from the validation component)

Typical usage:

;; Build the system (def sys {:pool pool :validator validator})

;; Enqueue (add-job sys "send-email" {:to "x@y.com"} {:queue-name "email"})

;; Worker loop (let [jobs (get-jobs sys worker-id {:batch-size 5})] (doseq [job jobs] (try (run-task! job) (complete-jobs sys worker-id [job] elapsed-ms) (catch Exception e (fail-jobs sys worker-id [{:job job :error e}] elapsed-ms)))))

Job lifecycle management: the JobEnqueue and WorkerExecution surfaces.

The system map passed to all functions must contain:
  :pool      - HikariCP DataSource (from the database component)
  :validator - PayloadValidator (from the validation component)

Typical usage:

  ;; Build the system
  (def sys {:pool pool :validator validator})

  ;; Enqueue
  (add-job sys "send-email" {:to "x@y.com"} {:queue-name "email"})

  ;; Worker loop
  (let [jobs (get-jobs sys worker-id {:batch-size 5})]
    (doseq [job jobs]
      (try
        (run-task! job)
        (complete-jobs sys worker-id [job] elapsed-ms)
        (catch Exception e
          (fail-jobs sys worker-id [{:job job :error e}] elapsed-ms)))))
raw docstring

dev.skivi.job-manager.schema

Malli schemas for job-manager component.

Malli schemas for job-manager component.
raw docstring

dev.skivi.maintenance.core

Background maintenance scheduler.

Runs two classes of work on separate cadences: Short interval (maintenance-interval-ms, default 60 s): reset-locked-jobs — frees jobs orphaned by crashed workers refill-rate-limits — refills token buckets whose window has expired

Cron schedule (schedule, default '0 3 * * *'): GC tasks listed in :tasks config (gc-task-identifiers, gc-job-queues, gc-job-history).

The GC schedule is evaluated on each maintenance tick; the first tick after a scheduled time triggers the run. Last-run time is tracked in memory and resets to nil on restart (causing GC to wait for the next scheduled slot).

Background maintenance scheduler.

Runs two classes of work on separate cadences:
  Short interval (maintenance-interval-ms, default 60 s):
    reset-locked-jobs — frees jobs orphaned by crashed workers
    refill-rate-limits — refills token buckets whose window has expired

  Cron schedule (schedule, default '0 3 * * *'):
    GC tasks listed in :tasks config (gc-task-identifiers, gc-job-queues,
    gc-job-history).

The GC schedule is evaluated on each maintenance tick; the first tick
after a scheduled time triggers the run. Last-run time is tracked in memory
and resets to nil on restart (causing GC to wait for the next scheduled slot).
raw docstring

dev.skivi.maintenance.interface

Maintenance scheduler: reset-locked-jobs and refill-rate-limits run on a short interval; GC tasks run on a configurable cron schedule.

Maintenance scheduler: reset-locked-jobs and refill-rate-limits run on a short
interval; GC tasks run on a configurable cron schedule.
raw docstring

dev.skivi.maintenance.schema

Malli schemas for the maintenance component.

Malli schemas for the maintenance component.
raw docstring

dev.skivi.migration.schema

Malli schemas for migration component.

Malli schemas for migration component.
raw docstring

dev.skivi.monitoring.core

Event emitter: fan-out delivery to registered handlers with a ring buffer.

Handlers registered with :all receive every event regardless of type. Handlers registered with a specific keyword receive only that event type. Handler errors are caught, counted in :dropped stats, and never propagate.

Thread safety: all mutable state lives in atoms updated with swap!/reset!. emit! and handler registration are safe to call concurrently.

Event emitter: fan-out delivery to registered handlers with a ring buffer.

Handlers registered with :all receive every event regardless of type.
Handlers registered with a specific keyword receive only that event type.
Handler errors are caught, counted in :dropped stats, and never propagate.

Thread safety: all mutable state lives in atoms updated with swap!/reset!.
emit! and handler registration are safe to call concurrently.
raw docstring

dev.skivi.monitoring.interface

Event emitter with fan-out delivery, a ring buffer, and operational stats.

Emitters are plain maps with atom-backed state. Create one per system and pass it to components that need to emit events (worker-pool, queue, etc.).

Typical usage:

;; Create from the :events section of MonitoringConfig (def emitter (monitoring/create-emitter {:enabled true :buffer-size 500}))

;; Register a structured logging handler for all events (monitoring/on emitter :all (fn [{:keys [type data]}] (log/info "event" (assoc data :event/type type))))

;; Register a handler for a specific event type (def hid (monitoring/on emitter :job/claimed (fn [event] (metrics/increment! :jobs-claimed))))

;; Emit from a component (monitoring/emit! emitter :job/claimed {:job-id id :worker-id wid})

;; Deregister when no longer needed (monitoring/off emitter hid)

;; Inspect recent events (ring buffer snapshot) (monitoring/events emitter)

;; In tests: use noop-emitter to silence all monitoring (monitoring/noop-emitter)

;; In tests: use collecting-emitter to assert events were emitted (let [em (monitoring/collecting-emitter)] (do-something em) (is (some #(= :job/claimed (:type %)) (monitoring/events em))))

Standard event types emitted by skivi components: :job/claimed - worker claimed a job from the database :job/completed - worker completed a job successfully :job/failed - worker failed a job (retry eligible) :job/exhausted - worker failed a job at max_attempts :job/partial-success - worker reported partial success :queue/locked - a named job queue was locked by a worker :queue/unlocked - a named job queue was released :cron/fired - a cron tab entry scheduled a job :worker/error - an unhandled exception occurred in a worker :pool/start - the worker pool started :pool/stop - the worker pool stopped

Event emitter with fan-out delivery, a ring buffer, and operational stats.

Emitters are plain maps with atom-backed state. Create one per system
and pass it to components that need to emit events (worker-pool, queue, etc.).

Typical usage:

  ;; Create from the :events section of MonitoringConfig
  (def emitter (monitoring/create-emitter {:enabled true :buffer-size 500}))

  ;; Register a structured logging handler for all events
  (monitoring/on emitter :all
                 (fn [{:keys [type data]}]
                   (log/info "event" (assoc data :event/type type))))

  ;; Register a handler for a specific event type
  (def hid (monitoring/on emitter :job/claimed
                           (fn [event] (metrics/increment! :jobs-claimed))))

  ;; Emit from a component
  (monitoring/emit! emitter :job/claimed {:job-id id :worker-id wid})

  ;; Deregister when no longer needed
  (monitoring/off emitter hid)

  ;; Inspect recent events (ring buffer snapshot)
  (monitoring/events emitter)

  ;; In tests: use noop-emitter to silence all monitoring
  (monitoring/noop-emitter)

  ;; In tests: use collecting-emitter to assert events were emitted
  (let [em (monitoring/collecting-emitter)]
    (do-something em)
    (is (some #(= :job/claimed (:type %)) (monitoring/events em))))

Standard event types emitted by skivi components:
  :job/claimed         - worker claimed a job from the database
  :job/completed       - worker completed a job successfully
  :job/failed          - worker failed a job (retry eligible)
  :job/exhausted       - worker failed a job at max_attempts
  :job/partial-success - worker reported partial success
  :queue/locked        - a named job queue was locked by a worker
  :queue/unlocked      - a named job queue was released
  :cron/fired          - a cron tab entry scheduled a job
  :worker/error        - an unhandled exception occurred in a worker
  :pool/start          - the worker pool started
  :pool/stop           - the worker pool stopped
raw docstring

dev.skivi.monitoring.schema

Malli schemas for the monitoring component.

Malli schemas for the monitoring component.
raw docstring

dev.skivi.queue.core

In-process job buffer with background polling from job-manager.

Jobs are claimed from the database in batches and held in a LinkedBlockingQueue until a worker calls take-job!. The poll loop runs as a daemon thread and automatically refetches when the buffer depth drops below the configured threshold.

In-process job buffer with background polling from job-manager.

Jobs are claimed from the database in batches and held in a
LinkedBlockingQueue until a worker calls take-job!. The poll loop runs
as a daemon thread and automatically refetches when the buffer depth
drops below the configured threshold.
raw docstring

dev.skivi.queue.interface

Local in-process job queue with background polling from job-manager.

The queue batches job claims from the database into a local buffer and distributes them to workers via take-job!. A background daemon thread polls job-manager continuously, refetching whenever the buffer depth drops below the configured threshold.

Typical usage (worker-pool calls create-queue once per pool):

(def q (-> (queue/create-queue sys worker-id {:size 50}) queue/start!))

;; Worker thread loop (when-let [job (queue/take-job! q 2000)] (try (execute job) (job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed) (catch Exception e (job-manager/fail-jobs sys (queue/worker-id q) [{:job job :error e}] elapsed))))

;; On shutdown (let [undispatched (queue/stop! q)] ;; undispatched jobs remain locked in DB; they will be recovered ;; by the ResetOverdueJobs rule after config.lock_timeout.

Local in-process job queue with background polling from job-manager.

The queue batches job claims from the database into a local buffer and
distributes them to workers via take-job!. A background daemon thread
polls job-manager continuously, refetching whenever the buffer depth
drops below the configured threshold.

Typical usage (worker-pool calls create-queue once per pool):

  (def q (-> (queue/create-queue sys worker-id {:size 50})
             queue/start!))

  ;; Worker thread loop
  (when-let [job (queue/take-job! q 2000)]
    (try
      (execute job)
      (job-manager/complete-jobs sys (queue/worker-id q) [job] elapsed)
      (catch Exception e
        (job-manager/fail-jobs sys (queue/worker-id q)
                               [{:job job :error e}] elapsed))))

  ;; On shutdown
  (let [undispatched (queue/stop! q)]
    ;; undispatched jobs remain locked in DB; they will be recovered
    ;; by the ResetOverdueJobs rule after config.lock_timeout.
raw docstring

dev.skivi.queue.schema

Malli schemas for queue component.

Malli schemas for queue component.
raw docstring

dev.skivi.scheduler.core

Cron scheduler: evaluates crontab entries and enqueues jobs when due.

Cron state (last_execution) is persisted to known_crontabs so that the scheduler survives process restarts without double-firing. Jobs are enqueued with job_key_mode = unsafe_dedupe, so multiple scheduler instances running in parallel will not produce duplicate queue entries.

The background polling thread evaluates all registered entries on each tick and fires any whose next_run_at <= now. The poll interval is coarse (default 10 s) because cron granularity is one minute.

Relies on the ReadableColumn protocol extensions defined in database.core to return java.time.Instant values for TIMESTAMPTZ columns.

Cron scheduler: evaluates crontab entries and enqueues jobs when due.

Cron state (last_execution) is persisted to known_crontabs so that
the scheduler survives process restarts without double-firing. Jobs are
enqueued with job_key_mode = unsafe_dedupe, so multiple scheduler instances
running in parallel will not produce duplicate queue entries.

The background polling thread evaluates all registered entries on each tick
and fires any whose next_run_at <= now. The poll interval is coarse
(default 10 s) because cron granularity is one minute.

Relies on the ReadableColumn protocol extensions defined in database.core
to return java.time.Instant values for TIMESTAMPTZ columns.
raw docstring

dev.skivi.scheduler.interface

Cron scheduler that enqueues jobs via job-manager when schedules are due.

Each cron entry has an identifier (used as both task-identifier and job-key), a standard UNIX 5-field cron expression, and optional per-entry job creation overrides. Cron state (last_execution) is persisted to skivi.known_crontabs so the scheduler survives restarts without double-firing.

Jobs are enqueued with job_key_mode = unsafe_dedupe: if the previous run's job is still queued or in flight, the new enqueue is a no-op.

Typical usage:

(def crontabs [{:identifier "daily-report" :schedule "0 2 * * *" :spec {:queue-name "maintenance" :max-attempts 3}} {:identifier "hourly-cleanup" :schedule "0 * * * *"}])

(def scheduler (-> (scheduler/create-scheduler sys crontabs emitter {:timezone "Europe/London"}) scheduler/start!))

;; On application shutdown (scheduler/stop! scheduler 5000)

Standard events emitted: :cron/fired - a cron entry fired and a job was enqueued; :identifier in data

Cron scheduler that enqueues jobs via job-manager when schedules are due.

Each cron entry has an identifier (used as both task-identifier and job-key),
a standard UNIX 5-field cron expression, and optional per-entry job creation
overrides. Cron state (last_execution) is persisted to skivi.known_crontabs
so the scheduler survives restarts without double-firing.

Jobs are enqueued with job_key_mode = unsafe_dedupe: if the previous run's
job is still queued or in flight, the new enqueue is a no-op.

Typical usage:

  (def crontabs
    [{:identifier "daily-report"
      :schedule   "0 2 * * *"
      :spec       {:queue-name "maintenance" :max-attempts 3}}
     {:identifier "hourly-cleanup"
      :schedule   "0 * * * *"}])

  (def scheduler
    (-> (scheduler/create-scheduler sys crontabs emitter {:timezone "Europe/London"})
        scheduler/start!))

  ;; On application shutdown
  (scheduler/stop! scheduler 5000)

Standard events emitted:
  :cron/fired - a cron entry fired and a job was enqueued; :identifier in data
raw docstring

dev.skivi.scheduler.schema

Malli schemas for the scheduler component.

Malli schemas for the scheduler component.
raw docstring

dev.skivi.skivi.core

Unified entry point that wires all skivi components into a single system map.

Typical usage:

(def system (-> (skivi/create-system config {"send-email" handle-send-email}) (skivi/start!)))

(skivi/add-job system "send-email" {:to "x@y.com"})

;; On shutdown (skivi/stop! system)

Unified entry point that wires all skivi components into a single system map.

Typical usage:

  (def system
    (-> (skivi/create-system config {"send-email" handle-send-email})
        (skivi/start!)))

  (skivi/add-job system "send-email" {:to "x@y.com"})

  ;; On shutdown
  (skivi/stop! system)
raw docstring

dev.skivi.validation.interface

Job payload validation interface supporting malli schemas, clojure.spec specs, and composite chains of validators.

Typical usage:

;; From config (malli schemas or clojure.spec keywords in [:schema :job-schemas]) (def v (create-validator config))

;; Directly (def v (malli-validator {:send-email [:map [:to :string] [:subject :string]]}))

;; Validate (validate-payload v :send-email {:to "x@y.com" :subject "Hi"})

To enable malli instrumentation of this namespace:

(require '[malli.instrument :as mi]) (mi/instrument!) (mi/unstrument!)

Job payload validation interface supporting malli schemas, clojure.spec specs,
and composite chains of validators.

Typical usage:

  ;; From config (malli schemas or clojure.spec keywords in [:schema :job-schemas])
  (def v (create-validator config))

  ;; Directly
  (def v (malli-validator {:send-email [:map [:to :string] [:subject :string]]}))

  ;; Validate
  (validate-payload v :send-email {:to "x@y.com" :subject "Hi"})

To enable malli instrumentation of this namespace:

  (require '[malli.instrument :as mi])
  (mi/instrument!)
  (mi/unstrument!)
raw docstring

dev.skivi.worker-pool.core

Thread-pool-based worker pool that claims jobs from the queue and dispatches them to registered task functions.

All worker threads share one queue and one worker-id, so jobs are claimed atomically in batches and distributed via take-job!. The pool does not own the job-system or emitter; callers supply those at creation time.

Task functions receive a context map with :job, :job-system, and :worker-id. They signal outcomes as follows: • return any value (including nil) - job succeeds (WorkerCompletesJob) • return (partial-success m) - partial success (WorkerReportsPartialSuccess) • throw any Throwable - job fails (WorkerFailsJob)

Events emitted: :pool/start - pool is started (includes :concurrency) :pool/stop - pool has stopped (includes :forced? when force-stop!) :job/completed - task fn returned normally :job/failed - task fn threw; job retry scheduled :job/exhausted - task fn threw on the final allowed attempt :job/partial-success - task fn returned a partial-success value :worker/error - infrastructure exception outside task execution

Thread-pool-based worker pool that claims jobs from the queue and dispatches
them to registered task functions.

All worker threads share one queue and one worker-id, so jobs are claimed
atomically in batches and distributed via take-job!. The pool does not own the
job-system or emitter; callers supply those at creation time.

Task functions receive a context map with :job, :job-system, and :worker-id.
They signal outcomes as follows:
  • return any value (including nil) - job succeeds (WorkerCompletesJob)
  • return (partial-success m)       - partial success (WorkerReportsPartialSuccess)
  • throw any Throwable              - job fails   (WorkerFailsJob)

Events emitted:
  :pool/start          - pool is started (includes :concurrency)
  :pool/stop           - pool has stopped (includes :forced? when force-stop!)
  :job/completed       - task fn returned normally
  :job/failed          - task fn threw; job retry scheduled
  :job/exhausted       - task fn threw on the final allowed attempt
  :job/partial-success - task fn returned a partial-success value
  :worker/error        - infrastructure exception outside task execution
raw docstring

dev.skivi.worker-pool.interface

Concurrent worker pool that claims jobs from the queue and executes task handlers.

All workers share a single local queue backed by job-manager. The pool uses one worker-id for all job claims; see worker-id to retrieve it.

Task functions are plain Clojure functions keyed by task-identifier string:

(def tasks {"send-email" (fn [{:keys [job]}] (send! (:payload job))) "resize-image" (fn [{:keys [job job-system worker-id]}] ...)})

A task fn receives a context map with: :job - the full job map returned by job-manager/get-jobs :job-system - the {:pool :validator} system map :worker-id - the pool's worker-id string

Task outcome is determined by the return value or exception: • return any value (including nil) - job completes successfully • return (partial-success m) - job reported as partial success • throw any Throwable - job is failed (retry or exhaust)

Typical usage:

(def pool (-> (worker-pool/create-pool sys tasks emitter {:concurrency 4}) worker-pool/start!))

;; On application shutdown (worker-pool/stop! pool 15000)

Standard events emitted by the pool (via the supplied emitter): :pool/start - pool started; :concurrency in data :pool/stop - pool stopped; :forced? in data :job/completed - task fn returned normally :job/failed - task fn threw; retry scheduled :job/exhausted - task fn threw on the final allowed attempt :job/partial-success - task fn returned a partial-success value :worker/error - infrastructure exception outside task execution

Concurrent worker pool that claims jobs from the queue and executes task handlers.

All workers share a single local queue backed by job-manager. The pool uses one
worker-id for all job claims; see worker-id to retrieve it.

Task functions are plain Clojure functions keyed by task-identifier string:

  (def tasks
    {"send-email"  (fn [{:keys [job]}] (send! (:payload job)))
     "resize-image" (fn [{:keys [job job-system worker-id]}] ...)})

A task fn receives a context map with:
  :job        - the full job map returned by job-manager/get-jobs
  :job-system - the {:pool :validator} system map
  :worker-id  - the pool's worker-id string

Task outcome is determined by the return value or exception:
  • return any value (including nil) - job completes successfully
  • return (partial-success m)       - job reported as partial success
  • throw any Throwable               - job is failed (retry or exhaust)

Typical usage:

  (def pool
    (-> (worker-pool/create-pool sys tasks emitter {:concurrency 4})
        worker-pool/start!))

  ;; On application shutdown
  (worker-pool/stop! pool 15000)

Standard events emitted by the pool (via the supplied emitter):
  :pool/start          - pool started; :concurrency in data
  :pool/stop           - pool stopped; :forced? in data
  :job/completed       - task fn returned normally
  :job/failed          - task fn threw; retry scheduled
  :job/exhausted       - task fn threw on the final allowed attempt
  :job/partial-success - task fn returned a partial-success value
  :worker/error        - infrastructure exception outside task execution
raw docstring

dev.skivi.worker-pool.schema

Malli schemas for the worker-pool component.

Malli schemas for the worker-pool component.
raw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close