A Clojure library that implements a MySQL-backed durable queue with support for scheduled jobs.
Get it from Clojars:
Database-backed queues are known to be fraught with various problems. It'd be unwise to deny this: compared to specialized queue and messaging systems they're slow and inefficient. If one's looking at processing hundreds of jobs per second, it is certainly advisable to consider a specialized solution. For smaller projects though, using a relational database as a queue offers considerable benefits:
The latter point is also important if what you're after is just a scheduler to push jobs to your internal queue. For example, you may decide to use a database table to store scheduled jobs that will later be executed via Zach Tellman's durable-queue. Nevertheless, relegating your database-backed queue to a position of a scheduler doesn't make it less of a queue. It still has to grapple with all of the concomitant issues, such as:
The mysql-queue
library aims at providing a sane implementation of a MySQL-backed queue while addressing all of the issues above.
It is especially useful for compact monolith web applications requiring both scheduled and real-time message processing (at a smaller scale).
In real world, this library is running at the core of the free DMARC monitoring tool by Postmark responsible for sending weekly digests to the subscribers, as well as executing a handful of other repeating tasks.
All public functions are located in the mysql-queue.core
namespace:
(require '[mysql-queue.core :as queue])
(def db-conn {:subprotocol "mysql"
:subname "//localhost:3306/myapp"
:user "user"
:password "password"})
This lib uses two MySQL tables: jobs
and scheduled_jobs
. The names are not configurable.
You can create these tables when your application starts by calling initialize!
function.
; The following call is idempotent, meaning it will not attempt to create the tables if they already exist
(queue/initialize! db-conn)
Schedule jobs via schedule-job
function:
(queue/schedule-job db-conn :my-job :begin {:echo "Hello, world!"} (java.util.Date.))
Jobs are functions of two arguments: status
(Clojure keyword) and args
(anything serializable with EDN).
A job function is expected to return a vector of two elements: new status and (potentially) updated args.
Examples: [:done {}]
, [:step2 {:step1-result 42}]
.
If a job returns something that isn't a vector of 2 elements, the return value considered to be [:done nil]
.
The job is considered completed if the returned status is :done
.
Two synonym statuses for :done
are available to provide more context in logs: :canceled
and :failed
.
If the returned status isn't :done
, it will be persisted and the job function will be executed again with returned arguments.
See Advanced section for more details on this behavior.
; Job function example
(defn my-job
[status {:keys [echo] :as args}]
(println echo)
[:done args])
Symbolic job names passed to schedule-job
are associated with job functions via a user-defined bindings map:
(def jobs {:my-job my-job})
Use worker
function to create a worker that will periodically check the database for new jobs and execute
associated job functions on a thread pool. See docstring for advanced configuration options such as concurrency
settings and user-defined logging functions.
(def worker (queue/worker db-conn jobs))
To gracefully stop a worker, use stop
function.
It will immediately stop publisher and recovery threads, and will then wait for the worker threads to process all loaded jobs for up to a specified number of seconds before unblocking the calling thread.
(queue/stop worker 5)
This covers the basics. Proceed to Advanced section for more details.
Job functions used by mysql-queue
are simple state machines. If your job is a combination of side effects (such as write to a database, notify an external service, then send an email), you may need to be able to pick up the job exactly where it left off in case any one of these multiple steps fails.
Take a look at the following job function:
(defn multi-step-job
[status {data :data}]
(->> data write-to-db (send-email :new-item) save-message-id))
This (valid) job function is quite terse, but still easy to understand if you're familiar with Clojure idioms.
Unfortunately, it couples three dependent side effects together into a single operation.
Assume there is a connection error to your mail transfer agent when sending that email.
It means that a given piece of data was recorded to the database, but the email wasn't yet sent.
If we retry this job, the data will be written to the database again, resulting in a duplicate.
Even worse, if save-message-id
throws a database connection error, retrying this job will mean sending another notification.
To mitigate this problem, we could structure our job in a bit different way:
(defmulti multi-step-job (fn [status _] status))
(defmethod multi-step-job :begin
[_ {data :data}]
(let [id (write-to-db data)]
[:send-email {:id id}]))
(defmethod multi-step-job :send-email
[_ {id :id}]
(let [message-id (send-email :new-item id)]
[:save-message-id {:message-id id}]))
(defmethod multi-step-job :save-message-id
[_ {:keys [message-id]}]
(save-message-id message-id)
[:done {}])
Here we used multimethods to represent all stateful transitions in our job. It doesn't look nearly as concise as before, but the new representation reveals how many actions that threading macro was hiding. Since the library persists any results returned by the job function, even if your VM crashes in the middle of the job, the recovery will later be able to pick up from the last successfully acknowledged step.
Warning: Be careful about returning large chunks of data because all job arguments have to be serialized and stored in the database.
Copyright © 2016 Wildbit
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close