Postgres based background processing worker system thing.
Built on top of next.jdbc
, hikari-cp
, hugsql
and clojure.tools.logging
.
:warning: Definitely not usable yet, see dev-resources/taskmaster/
directory for samples and examples
Recommended environment:
But will probably work with Postgres 9.6, Clojure 1.9 and JDK 8.
You'll need a Postgres server, and a databsae created. Then you can create the necessary jobs table structure:
(taskmaster.operation/create-jobs-table! jdbc-connection)
:warning: This has to run before you start any consumers, as they will attempt to fetch any unprocessed jobs while starting!
Now you can define your consumer, and a callback function which will process each job. If processing is done, return :taskmaster.operation/ack
qualified symbol. If not, return :taskmaster.operation/reject
.
That will keep the failed job data in the jobs table so that you can:
run_out
column to 0
(defn callback [{:keys [payload]}]
(if (do/some-other-work payload)
:taskmaster.operation/ack
:taskmaster.operation/reject))
Let's define a consumer:
(def consumer
(taskmaster.queue/start! jdbc-conn {:queue-name "do_work_yo"
:callback callback
:concurrency 3}))
Consumer will spin up 1 listener thread to be notified about new reocrds being inserted matching the queue name and 3 threads to process these jobs, in parallel, one at a time.
Now let's queue up some jobs:
(taskmaster.queue/put! jdbc-conn {:queue-name "do_work_yo" :payload {:send-email "test@example.com"}})
By default, job payloads are stored as JSON and Taskmaster is setup to serialize/deserialize it using Cheshire.
That's it, you can now add other fun things like:
Recommended way is to use a Component approach, but it's not stricly necessary:
(require '[taskmaster.dev.connection :as c]
'[taskmaster.component :as com]
'[clojure.tools.logging :as log]
'[com.stuartsierra.component :as component])
(def qs (atom []))
;; `component` is the whole consumer component here - so you have access to its' dependencies
(defn callback [{:keys [id queue-name payload component] :as job}]
(log/infof "got-job t=%s q=%s %s" component queue-name payload)
(swap! qs conj id)
(log/info (count (set @qs)))
(let [res (if (and (:some-number payload) (even? (:some-number payload)))
:taskmaster.operation/ack
:taskmaster.operation/reject)]
(log/info res)
res))
(def system
{:db-conn (c/make-one)
:consumer (component/using
(com/create-consumer {:queue-name "t3"
:callback callback
:concurrency 2})
[:db-conn :some-thing])
:some-thing {:some :thing}
:publisher (component/using
(com/create-publisher)
[:db-conn])})
(def SYS
(component/start-system (component/map->SystemMap system)))
(com/put! (:publisher SYS) {:queue-name "t3" :payload {:some-number 2}})
(component/stop SYS)
There are three core parts:
LISTEN / NOTIFY
triggers and functionsj.u.c ConcurrentLinkedQueue
select ... FOR UPDATE SKIP LOCKED
When the job table is setup, there's a trigger added to send NOTIFY
whenever a new record is inserted. Then Taskmaster sets up a listener to receive pings whenever inserts happen. These pings are sent over a ConcurrentLinkedQueue
to a pool of threads, which pull all job payloads from the table via a transaction and ensure atomicity via SELECT ... FOR ... SKIP LOCKED
.
Inspired by:
Copyright © 2020 Łukasz Korecki All rights reserved. The use and distribution terms for this software are covered by the Eclipse Public License 1.0 which can be found at http://opensource.org/licenses/eclipse-1.0.php By using this software in any fashion, you are agreeing to be bound by the terms of this license. You must not remove this notice, or any other, from this software.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close