Postgres based background processing worker system thing.
:warning: Somewhat usable, see dev-resources/taskmaster/
directory for samples and examples
Recommended environment:
Required:
will probably work with Postgres 9.6, Clojure 1.9 and JDK 8.
You'll need a Postgres server, and a databsae created. Then you can create the necessary jobs table structure:
:warning: This has to run before you start any consumers, as they will attempt to fetch any unprocessed jobs while starting!
(taskmaster.operation/create-jobs-table! jdbc-connection)
Now you can define your consumers, and a handler function which will process each job. If processing is done, return :taskmaster.operation/ack
qualified symbol. If not, return :taskmaster.operation/reject
.
Example:
(defn handler [{:keys [payload]}]
(if (do/some-other-work payload)
:taskmaster.operation/ack
:taskmaster.operation/reject))
Rejecting a job will keep the failed job data in the jobs table so that you can:
(taskmaster.operation/requeue! conn {:id [faild-job-id-1 failed-job-id-2]})
Failed jobs can be found by querying for run_count > 0
or via
(taskmaster.operation/find-failed-jobs conn {:queue-name queue-name})
Let's define a consumer:
(def consumer
(taskmaster.queue/start! db-conn {:queue-name "do_work_yo"
:handler handler
:concurrency 3}))
Consumer will spin up 1 listener thread to be notified about new reocrds being inserted matching the queue name and 3 threads to process these jobs, in parallel, one at a time.
Now let's queue up some jobs:
(taskmaster.queue/put! db-conn {:queue-name "do_work_yo" :payload {:send-email "test@example.com"}})
By default, job payloads are stored as JSON and Taskmaster is setup to serialize/deserialize it using Cheshire.
That's it, you can now add other fun things like:
Component approach is recommended, but it's not stricly necessary. At minimum you'll need
Publisher is not necessary, as you can ivoke taskmaster.queue/put!
directly, but it's good to have one anyway, especially in tests.
(require '[taskmaster.dev.connection :as c]
'[taskmaster.component :as com]
'[clojure.tools.logging :as log]
'[com.stuartsierra.component :as component])
;; stores all payloads
(def qs (atom []))
;; `component` is the whole consumer component here - so you have access to its' dependencies
(defn handler [{:keys [id queue-name payload component] :as job}]
(log/infof "got-job t=%s q=%s %s" component queue-name payload)
(swap! qs conj id)
(log/info (count (set @qs)))
;; fail the job if some-number in the payload is not even
(let [res (if (and (:some-number payload) (even? (:some-number payload)))
:taskmaster.operation/ack
:taskmaster.operation/reject)]
(log/info res)
res))
(def system
{:db-conn (c/make-one)
:consumer (component/using
(com/create-consumer {:queue-name "t3"
:handler handler
:concurrency 2})
[:db-conn :some-thing])
:some-thing {:some :thing}
:publisher (component/using
(com/create-publisher)
[:db-conn])})
(def SYS
(component/start-system (component/map->SystemMap system)))
(com/put! (:publisher SYS) {:queue-name "t3" :payload {:some-number 2}})
;; wait a sec
(component/stop SYS)
There are three core parts:
LISTEN / NOTIFY
triggers and functionsj.u.c ConcurrentLinkedQueue
select ... FOR UPDATE SKIP LOCKED
When the job table is setup, there's a trigger added to send NOTIFY
whenever a new record is inserted. Then Taskmaster sets up a listener to receive pings
whenever inserts happen. These pings are sent over a ConcurrentLinkedQueue
to a pool of threads, which
pull all job payloads from the table via a transaction and ensure atomicity via SELECT ... FOR ... SKIP LOCKED
.
Taskmaster is built on top of:
clojure.tools.logging
for logs (this repo uses logback as the backend)cheshire
serialization for JSONB +next.jdbc
hikari-cp
- connection poolhugsql
all of the queriesMost of it's wrapped by nomnom/utility-belt.sql
to smooth out the rough edges
Unreleased officialy, check on Clojars
[x] :bug: fix a bug where restarting consumers will pick up previously failed jobs
[ ] non-deleting mode, where ackd jobs stay in the table, this is useful for reprocessing jobs or gathering some extra metrics
[ ] verify this actually works in production workloads
[ ] pluggable serialization (avro, simple text, etc) - maybe via middleware?
[ ] remove HugSQL and nomnom/utility-belt.sql
to use next.jdbc
directly
Inspired by:
Copyright © 2020 Łukasz Korecki All rights reserved. The use and distribution terms for this software are covered by the Eclipse Public License 1.0 which can be found at http://opensource.org/licenses/eclipse-1.0.php By using this software in any fashion, you are agreeing to be bound by the terms of this license. You must not remove this notice, or any other, from this software.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close