This example demonstrates a Queue Worker process that polls the default queue for jobs, and how to enqueue jobs for this Queue Worker.
To run this example, you'll need:
clj
command-line tool, version 1.10.1.697 or later, with support
for the -X option.
(Installation instructions)git clone git@github.com:msolli/proletarian.git
. We'll
assume you're in this directory going forward.All right, let's get started.
There is a Makefile target for creating the example database. Run it with
make examples.db.install
. Here's how it looks:
$ make examples.db.install
DATABASE_NAME=proletarian ./database/install.sh
Installing Proletarian Database
= = =
Creating User
- - -
» proletarian role
Creating Database
- - -
» proletarian database
Creating Schema, Tables, and Index
» proletarian schema
» proletarian.job table
» proletarian.archived_job table
» proletarian.job_queue_process_at index
Granting Privileges
- - -
» schema privileges
» table privileges
= = =
Done Installing Proletarian Database
You can uninstall the example database with make examples.db.uninstall
, and
re-create it with make examples.db.recreate
.
Please note that while the database install script allows for customization of
the database name, the examples assume that the default (proletarian
)
is used.
You might want to exercise Proletarian with an ephemeral Postgres instance running on Docker. Here's how.
In one terminal window:
docker run -p 55432:5432 -e POSTGRES_PASSWORD=proletarian postgres
In another:
PGHOST=localhost PGPORT=55432 PGUSER=postgres PGPASSWORD=proletarian make examples.db.install
In the examples, make sure the DATABASE_URL
environment variable is set:
export DATABASE_URL="jdbc:postgresql://localhost:55432/proletarian?user=postgres&password=proletarian"
In one of your terminal windows, run this command to start the Queue Worker:
$ clj -X:examples example-a.worker/run
It should start a process that polls the default queue for jobs every 5 seconds:
$ clj -X:examples example-a.worker/run
Number of jobs in :proletarian/default queue: 0
Number of jobs in proletarian.jobs table: 0
Number of jobs in proletarian.archived_jobs table: 0
Starting worker for :proletarian/default queue with polling interval 5 s
:proletarian.worker/polling-for-jobs {:worker-thread-id 1, :proletarian.worker/queue-worker-id proletarian[:proletarian/default]}
:proletarian.worker/polling-for-jobs {:worker-thread-id 1, :proletarian.worker/queue-worker-id proletarian[:proletarian/default]}
[...and so forth, until you press Ctrl-C]
Leave this process running while you continue to step 3.
What is happening here, is that we've started a Queue Worker thread pool in a JVM process that continuously polls the jobs table for new work to be done. The polling interval is configurable - in this example it is 5 seconds. (In a production system you'd probably use a much smaller polling interval - the default is 100 ms. For this example, though, it is much easier to see what's going on with a larger polling interval.)
The code for this can be found in
the example-a.worker
namespace.
In your other terminal window, run this command to enqueue a job:
$ clj -X:examples example-a.enqueue-jobs/run
It should add a job to the default queue and print the job details:
$ clj -X:examples example-a.enqueue-jobs/run
Number of jobs in :proletarian/default queue: 0
Number of jobs in proletarian.jobs table: 0
Number of jobs in proletarian.archived_jobs table: 0
Adding new job to :proletarian/default queue:
{:job-id #uuid "b78295d5-9241-45c1-8288-e4cf128d6bea",
:job-type :example-a.enqueue-jobs/echo,
:payload {:message "Hello world!",
:timestamp #inst "2021-01-06T20:43:42.451299Z"}}
You should see this job get picked up by the queue worker process in the other terminal window. You'll see output like this:
:proletarian.worker/handling-job {:job-id #uuid "b78295d5-9241-45c1-8288-e4cf128d6bea", :job-type :example-a.enqueue-jobs/echo, :attempt 1, :worker-thread-id 1, :proletarian.worker/queue-worker-id proletarian[:proletarian/default]}
Running job :example-a.enqueue-jobs/echo. Payload:
{:message "Hello world!", :timestamp #inst "2021-01-06T20:43:42.451Z"}
:proletarian.worker/job-finished {:job-id #uuid "b78295d5-9241-45c1-8288-e4cf128d6bea", :job-type :example-a.enqueue-jobs/echo, :attempt 1, :worker-thread-id 1, :proletarian.worker/queue-worker-id proletarian[:proletarian/default]}
Run the same command again to see the miracle unfold one more time.
Things to note:
job-id
UUID. You shouldn't need to keep track of this
id, it's just the unique id of this job.:job-type
, which is a Clojure keyword. This keyword is what
determines which handler function Proletarian will invoke for this job. You
need to implement the proletarian.job/handle-job!
multimethod for each job
type you want to handle. A common practice is to use a namespaced keyword with
shorthand notation for the current namespace (the double colon: ::
)
as your job type. In this example we use ::echo
in the code, which expands
to :example-a.enqueue-jobs/echo
, as seen in the output above.TEXT
column in the database. Proletarian de-serializes the payload and
passes it to the handler. The serializer is configurable - just implement
the proletarian.protocols/Serializer
protocol and pass an instance of this
under the :proletarian/serializer
option to
proletarian.job/enqueue!
and proletarian.worker/queue-worker-controller
.Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close