Event emitter: fan-out delivery to registered handlers with a ring buffer.
Handlers registered with :all receive every event regardless of type. Handlers registered with a specific keyword receive only that event type. Handler errors are caught, counted in :dropped stats, and never propagate.
Thread safety: all mutable state lives in atoms updated with swap!/reset!. emit! and handler registration are safe to call concurrently.
Event emitter: fan-out delivery to registered handlers with a ring buffer. Handlers registered with :all receive every event regardless of type. Handlers registered with a specific keyword receive only that event type. Handler errors are caught, counted in :dropped stats, and never propagate. Thread safety: all mutable state lives in atoms updated with swap!/reset!. emit! and handler registration are safe to call concurrently.
Event emitter with fan-out delivery, a ring buffer, and operational stats.
Emitters are plain maps with atom-backed state. Create one per system and pass it to components that need to emit events (worker-pool, queue, etc.).
Typical usage:
;; Create from the :events section of MonitoringConfig (def emitter (monitoring/create-emitter {:enabled true :buffer-size 500}))
;; Register a structured logging handler for all events (monitoring/on emitter :all (fn [{:keys [type data]}] (log/info "event" (assoc data :event/type type))))
;; Register a handler for a specific event type (def hid (monitoring/on emitter :job/claimed (fn [event] (metrics/increment! :jobs-claimed))))
;; Emit from a component (monitoring/emit! emitter :job/claimed {:job-id id :worker-id wid})
;; Deregister when no longer needed (monitoring/off emitter hid)
;; Inspect recent events (ring buffer snapshot) (monitoring/events emitter)
;; In tests: use noop-emitter to silence all monitoring (monitoring/noop-emitter)
;; In tests: use collecting-emitter to assert events were emitted (let [em (monitoring/collecting-emitter)] (do-something em) (is (some #(= :job/claimed (:type %)) (monitoring/events em))))
Standard event types emitted by skivi components: :job/claimed - worker claimed a job from the database :job/completed - worker completed a job successfully :job/failed - worker failed a job (retry eligible) :job/exhausted - worker failed a job at max_attempts :job/partial-success - worker reported partial success :queue/locked - a named job queue was locked by a worker :queue/unlocked - a named job queue was released :cron/fired - a cron tab entry scheduled a job :worker/error - an unhandled exception occurred in a worker :pool/start - the worker pool started :pool/stop - the worker pool stopped
Event emitter with fan-out delivery, a ring buffer, and operational stats.
Emitters are plain maps with atom-backed state. Create one per system
and pass it to components that need to emit events (worker-pool, queue, etc.).
Typical usage:
;; Create from the :events section of MonitoringConfig
(def emitter (monitoring/create-emitter {:enabled true :buffer-size 500}))
;; Register a structured logging handler for all events
(monitoring/on emitter :all
(fn [{:keys [type data]}]
(log/info "event" (assoc data :event/type type))))
;; Register a handler for a specific event type
(def hid (monitoring/on emitter :job/claimed
(fn [event] (metrics/increment! :jobs-claimed))))
;; Emit from a component
(monitoring/emit! emitter :job/claimed {:job-id id :worker-id wid})
;; Deregister when no longer needed
(monitoring/off emitter hid)
;; Inspect recent events (ring buffer snapshot)
(monitoring/events emitter)
;; In tests: use noop-emitter to silence all monitoring
(monitoring/noop-emitter)
;; In tests: use collecting-emitter to assert events were emitted
(let [em (monitoring/collecting-emitter)]
(do-something em)
(is (some #(= :job/claimed (:type %)) (monitoring/events em))))
Standard event types emitted by skivi components:
:job/claimed - worker claimed a job from the database
:job/completed - worker completed a job successfully
:job/failed - worker failed a job (retry eligible)
:job/exhausted - worker failed a job at max_attempts
:job/partial-success - worker reported partial success
:queue/locked - a named job queue was locked by a worker
:queue/unlocked - a named job queue was released
:cron/fired - a cron tab entry scheduled a job
:worker/error - an unhandled exception occurred in a worker
:pool/start - the worker pool started
:pool/stop - the worker pool stoppedMalli schemas for the monitoring component.
Malli schemas for the monitoring component.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |