Event emitter with fan-out delivery, a ring buffer, and operational stats.
Emitters are plain maps with atom-backed state. Create one per system and pass it to components that need to emit events (worker-pool, queue, etc.).
Typical usage:
;; Create from the :events section of MonitoringConfig (def emitter (monitoring/create-emitter {:enabled true :buffer-size 500}))
;; Register a structured logging handler for all events (monitoring/on emitter :all (fn [{:keys [type data]}] (log/info "event" (assoc data :event/type type))))
;; Register a handler for a specific event type (def hid (monitoring/on emitter :job/claimed (fn [event] (metrics/increment! :jobs-claimed))))
;; Emit from a component (monitoring/emit! emitter :job/claimed {:job-id id :worker-id wid})
;; Deregister when no longer needed (monitoring/off emitter hid)
;; Inspect recent events (ring buffer snapshot) (monitoring/events emitter)
;; In tests: use noop-emitter to silence all monitoring (monitoring/noop-emitter)
;; In tests: use collecting-emitter to assert events were emitted (let [em (monitoring/collecting-emitter)] (do-something em) (is (some #(= :job/claimed (:type %)) (monitoring/events em))))
Standard event types emitted by skivi components: :job/claimed - worker claimed a job from the database :job/completed - worker completed a job successfully :job/failed - worker failed a job (retry eligible) :job/exhausted - worker failed a job at max_attempts :job/partial-success - worker reported partial success :queue/locked - a named job queue was locked by a worker :queue/unlocked - a named job queue was released :cron/fired - a cron tab entry scheduled a job :worker/error - an unhandled exception occurred in a worker :pool/start - the worker pool started :pool/stop - the worker pool stopped
Event emitter with fan-out delivery, a ring buffer, and operational stats.
Emitters are plain maps with atom-backed state. Create one per system
and pass it to components that need to emit events (worker-pool, queue, etc.).
Typical usage:
;; Create from the :events section of MonitoringConfig
(def emitter (monitoring/create-emitter {:enabled true :buffer-size 500}))
;; Register a structured logging handler for all events
(monitoring/on emitter :all
(fn [{:keys [type data]}]
(log/info "event" (assoc data :event/type type))))
;; Register a handler for a specific event type
(def hid (monitoring/on emitter :job/claimed
(fn [event] (metrics/increment! :jobs-claimed))))
;; Emit from a component
(monitoring/emit! emitter :job/claimed {:job-id id :worker-id wid})
;; Deregister when no longer needed
(monitoring/off emitter hid)
;; Inspect recent events (ring buffer snapshot)
(monitoring/events emitter)
;; In tests: use noop-emitter to silence all monitoring
(monitoring/noop-emitter)
;; In tests: use collecting-emitter to assert events were emitted
(let [em (monitoring/collecting-emitter)]
(do-something em)
(is (some #(= :job/claimed (:type %)) (monitoring/events em))))
Standard event types emitted by skivi components:
:job/claimed - worker claimed a job from the database
:job/completed - worker completed a job successfully
:job/failed - worker failed a job (retry eligible)
:job/exhausted - worker failed a job at max_attempts
:job/partial-success - worker reported partial success
:queue/locked - a named job queue was locked by a worker
:queue/unlocked - a named job queue was released
:cron/fired - a cron tab entry scheduled a job
:worker/error - an unhandled exception occurred in a worker
:pool/start - the worker pool started
:pool/stop - the worker pool stopped(collecting-emitter)Returns an emitter that buffers every event without dropping. Call events to retrieve all emitted events. Use in tests.
Returns an emitter that buffers every event without dropping. Call events to retrieve all emitted events. Use in tests.
(create-emitter)(create-emitter config)Creates an event emitter from an events config map. config keys: :enabled (boolean), :buffer-size (pos-int). Called with no args creates an emitter with sensible defaults.
Creates an event emitter from an events config map. config keys: :enabled (boolean), :buffer-size (pos-int). Called with no args creates an emitter with sensible defaults.
(emit! emitter event-type data)Emits an event of event-type with data map to all registered handlers. Exceptions thrown by handlers are caught and counted as :dropped. Returns nil. No-op when emitter is disabled.
Emits an event of event-type with data map to all registered handlers. Exceptions thrown by handlers are caught and counted as :dropped. Returns nil. No-op when emitter is disabled.
(enabled? emitter)Returns true if the emitter will route events to handlers.
Returns true if the emitter will route events to handlers.
(events emitter)Returns a snapshot of buffered events in emission order (oldest first). Each event is a map with :type, :data, and :emitted-at keys.
Returns a snapshot of buffered events in emission order (oldest first). Each event is a map with :type, :data, and :emitted-at keys.
(noop-emitter)Returns a disabled emitter. All emit! calls and handler registrations are no-ops. Use when monitoring is disabled in config or in unit tests.
Returns a disabled emitter. All emit! calls and handler registrations are no-ops. Use when monitoring is disabled in config or in unit tests.
(off emitter handler-id)Removes the handler identified by handler-id. Returns nil.
Removes the handler identified by handler-id. Returns nil.
(on emitter event-type handler-fn)Registers handler-fn to receive events of event-type. Pass :all to receive every event. Returns a handler-id (UUID) for deregistration via off. Returns nil when the emitter is disabled.
Registers handler-fn to receive events of event-type. Pass :all to receive every event. Returns a handler-id (UUID) for deregistration via off. Returns nil when the emitter is disabled.
(stats emitter)Returns a snapshot of emitter operational metrics. Keys: :emitted (total events emitted), :dropped (handler errors caught).
Returns a snapshot of emitter operational metrics. Keys: :emitted (total events emitted), :dropped (handler errors caught).
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |