An opinionated Datomic queue for building (more) reliable systems. Implements the transactional outbox pattern. Supports retries, backoff and more. On-prem only.
(require '[com.github.ivarref.yoltq :as yq])
(def conn (datomic.api/connect "..."))
; Initialize system
(yq/init! {:conn conn})
; Add a queue consumer that will intentionally fail on the first attempt
(yq/add-consumer! :q
(let [cnt (atom 0)]
(fn [payload]
(when (= 1 (swap! cnt inc))
; A consumer throwing an exception is considered a queue job failure
(throw (ex-info "failed" {})))
; Anything else than a throwing exception is considered a queue job success
; This includes nil, false and everything else.
(log/info "got payload" payload))))
; Start threadpool
(yq/start!)
; Queue a job
@(d/transact conn [(yq/put :q {:work 123})])
; On your console you will see something like this:
; 17:29:54.598 DEBUG queue item 613... for queue :q is pending status :init
; 17:29:54.602 DEBUG queue item 613... for queue :q now has status :processing
; 17:29:54.603 DEBUG queue item 613... for queue :q is now processing
; 17:29:54.605 WARN queue-item 613... for queue :q now has status :error after 1 try in 4.8 ms
; 17:29:54.607 WARN error message was: "failed" for queue-item 613...
; 17:29:54.615 WARN ex-data was: {} for queue-item 613...
; The item is so far failed...
; But after approximately 10 seconds have elapsed, the item will be retried:
; 17:30:05.596 DEBUG queue item 613... for queue :q now has status :processing
; 17:30:05.597 DEBUG queue item 613... for queue :q is now processing
; 17:30:05.597 INFO got payload {:work 123}
; 17:30:05.599 INFO queue-item 613... for queue :q now has status :done after 2 tries in 5999.3 ms
; And then it has succeeded.
Integrating with external systems that may be unavailable can be tricky. Imagine the following code:
(defn post-handler [user-input]
(let [db-item (process user-input)
ext-ref (clj-http.client/post ext-service {...})] ; may throw exception
@(d/transact conn [(assoc db-item :some/ext-ref ext-ref)])))
What if the POST request fails? Should it be retried? For how long? Should it be allowed to fail? How do you then process failures later?
The queue way to solve this would be:
(defn get-ext-ref [{:keys [id]}]
(let [ext-ref (clj-http.client/post ext-service {...})] ; may throw exception
@(d/transact conn [[:db/cas [:some/id id]
:some/ext-ref
nil
ext-ref]])))
(yq/add-consumer! :get-ext-ref get-ext-ref {:allow-cas-failure? true})
(defn post-handler [user-input]
(let [{:some/keys [id] :as db-item} (process user-input)
@(d/transact conn [db-item
(yq/put :get-ext-ref {:id id})])))
Here post-handler
will always succeed as long as the transaction commits.
get-ext-ref
may fail multiple times if ext-service
is down.
This is fine as long as it eventually succeeds.
There is a special case where get-ext-ref
succeeds, but
saving the new queue job status to the database fails.
Thus get-ext-ref
and any queue consumer should tolerate to
be executed successfully several times.
For get-ext-ref
this is solved by using
the database function :db/cas (compare-and-swap)
to achieve a write-once behaviour.
The yoltq system treats cas failures as job successes
when a consumer has :allow-cas-failure?
set to true
in its options.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close