(create-queue-worker data-source handler-fn)
(create-queue-worker
data-source
handler-fn
{:proletarian/keys
[queue job-table archived-job-table serializer uuid-serializer log
handler-fn-mode retry-strategy-fn failed-job-fn queue-worker-id
polling-interval-ms worker-threads on-polling-error
await-termination-timeout-ms install-jvm-shutdown-hook? on-shutdown clock]
:or {clock (Clock/systemUTC)
on-shutdown (fn [])
log log/println-logger
await-termination-timeout-ms 10000
polling-interval-ms 100
archived-job-table db/DEFAULT_ARCHIVED_JOB_TABLE
install-jvm-shutdown-hook? false
uuid-serializer (pg-uuid/create-serializer)
queue db/DEFAULT_QUEUE
failed-job-fn (constantly nil)
serializer (transit/create-serializer)
retry-strategy-fn (constantly nil)
handler-fn-mode :default
worker-threads 1
on-polling-error (constantly true)
job-table db/DEFAULT_JOB_TABLE}})
Create and return a Queue Worker, which is an instance of proletarian.protocols/QueueWorker
. After creation, the
Queue Worker must be started using start!
, and can be stopped using stop!
.
data-source
– a javax.sql.DataSource
factory for creating connections to the PostgreSQL database.handler-fn
– the function that will be called when a job is pulled off the queue. By default this should be an
arity-2 function or multimethod. The first argument is the job type (as provided to proletarian.job/enqueue!
).
The second argument is the job's payload (again, as provided to proletarian.job/enqueue!
). This mode of
calling the handler-fn can be changed with the :proletarian/handler-fn-mode
, see below.options
– an optional map describing configuration options, see below.The optional third argument is an options map with the following keys, all optional with default values:
:proletarian/queue
– a keyword with the name of the queue. The default value is :proletarian/default
.:proletarian/job-table
– which PostgreSQL table to write the job to. The default is proletarian.job
. You should
only have to override this if you changed the default table name during installation.:proletarian/archived-job-table
– which PostgreSQL table to write archived jobs to. The default is
proletarian.archived_job
. You should only have to override this if you changed the default table name during
installation.:proletarian/serializer
– an implementation of the proletarian.protocols/Serializer
protocol. The default is
a Transit serializer (see proletarian.transit/create-serializer
). If you override this, you should use the
same serializer for proletarian.job/enqueue!
.:proletarian/handler-fn-mode
– a keyword that specifies how the handler-fn
will be called. The default is
:default
. Possible values:
:default
– Call the handler-fn
with two arguments, the job type and the payload (see handler-fn
above):advanced
- Call the handler-fn
with one argument, a map with the job's attributes:
:proletarian.job/job-type
, :proletarian.job/payload
, :proletarian.job/job-id
, :proletarian.job/queue
,
:proletarian.job/enqueued-at
, :proletarian.job/process-at
and :proletarian.job/attempts
.:proletarian/retry-strategy-fn
– a function that will be called to provide the retry strategy for a job if it
fails. It should be an arity-2 function or multimethod. The first argument is a map with the job's attributes:
:proletarian.job/job-type
, :proletarian.job/payload
, :proletarian.job/job-id
, :proletarian.job/queue
,
:proletarian.job/enqueued-at
, :proletarian.job/process-at
and :proletarian.job/attempts
. The second
argument is the exception that caused the job to fail. It should return a map that specifies the
retry strategy.:proletarian/failed-job-fn
– a function that will be called when a job has failed after retries. It should be an
arity-2 function or multimethod. The first argument is a map with the job's attributes (see
:proletarian/retry-strategy-fn
). The second argument is the exception that caused the job to fail the last
time.:proletarian/log
– a logger function that Proletarian calls whenever anything interesting happens during
operation. It takes two arguments: The first is a keyword identifying the event being logged. The second is a map
with data describing the event. The default logging function is simply a println-logger that will print
every event using println
.:proletarian/queue-worker-id
– a string identifying this Queue Worker. It is used as a thread prefix for names of
threads in the thread pool. It is also added to the log event data under the key
:proletarian.worker/queue-worker-id
. The default value is computed from the name of the queue that this worker
is getting jobs from.:proletarian/polling-interval-ms
– the time in milliseconds to wait after a job is finished before polling for a
new one. The default value is 100 milliseconds.:proletarian/worker-threads
– the number of worker threads that work in parallel. The default value is 1.:proletarian/on-polling-error
– a function that Proletarian calls when a Throwable is thrown during polling for
jobs. It takes one argument, the Throwable that was thrown. If it returns a truthy value, the Queue Worker is
stopped. The default behavior is to stop the Queue Worker.:proletarian/await-termination-timeout-ms
– the time in milliseconds to wait for jobs to finish before throwing
an error when shutting down the thread pool. The default value is 10000 (10 seconds).:proletarian/install-jvm-shutdown-hook?
– should Proletarian install a JVM shutdown hook that tries to stop the
Queue Worker (using stop!
) when the JVM is shut down? The default is false
.:proletarian/on-shutdown
– a function that Proletarian calls after the Queue Worker has shut down successfully.
It takes no arguments, and the return value is discarded. The default function is a no-op.:proletarian/clock
– the [[java.time.Clock]] to use for getting the current time. Used in testing. The default is
[[java.time.Clock/systemUTC]].Create and return a Queue Worker, which is an instance of [[proletarian.protocols/QueueWorker]]. After creation, the Queue Worker must be started using [[start!]], and can be stopped using [[stop!]]. ### Arguments * `data-source` – a [javax.sql.DataSource](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/javax/sql/DataSource.html) factory for creating connections to the PostgreSQL database. * `handler-fn` – the function that will be called when a job is pulled off the queue. By default this should be an arity-2 function or multimethod. The first argument is the job type (as provided to [[proletarian.job/enqueue!]]). The second argument is the job's payload (again, as provided to [[proletarian.job/enqueue!]]). This mode of calling the handler-fn can be changed with the `:proletarian/handler-fn-mode`, see below. * `options` – an optional map describing configuration options, see below. ### Options The optional third argument is an options map with the following keys, all optional with default values: * `:proletarian/queue` – a keyword with the name of the queue. The default value is `:proletarian/default`. * `:proletarian/job-table` – which PostgreSQL table to write the job to. The default is `proletarian.job`. You should only have to override this if you changed the default table name during installation. * `:proletarian/archived-job-table` – which PostgreSQL table to write archived jobs to. The default is `proletarian.archived_job`. You should only have to override this if you changed the default table name during installation. * `:proletarian/serializer` – an implementation of the [[proletarian.protocols/Serializer]] protocol. The default is a Transit serializer (see [[proletarian.transit/create-serializer]]). If you override this, you should use the same serializer for [[proletarian.job/enqueue!]]. * `:proletarian/handler-fn-mode` – a keyword that specifies how the `handler-fn` will be called. The default is `:default`. Possible values: - `:default` – Call the `handler-fn` with two arguments, the job type and the payload (see `handler-fn` above) - `:advanced` - Call the `handler-fn` with one argument, a map with the job's attributes: `:proletarian.job/job-type`, `:proletarian.job/payload`, `:proletarian.job/job-id`, `:proletarian.job/queue`, `:proletarian.job/enqueued-at`, `:proletarian.job/process-at` and `:proletarian.job/attempts`. * `:proletarian/retry-strategy-fn` – a function that will be called to provide the __retry strategy__ for a job if it fails. It should be an arity-2 function or multimethod. The first argument is a map with the job's attributes: `:proletarian.job/job-type`, `:proletarian.job/payload`, `:proletarian.job/job-id`, `:proletarian.job/queue`, `:proletarian.job/enqueued-at`, `:proletarian.job/process-at` and `:proletarian.job/attempts`. The second argument is the exception that caused the job to fail. It should return a map that specifies the [retry strategy](/readme#retries). * `:proletarian/failed-job-fn` – a function that will be called when a job has failed after retries. It should be an arity-2 function or multimethod. The first argument is a map with the job's attributes (see `:proletarian/retry-strategy-fn`). The second argument is the exception that caused the job to fail the last time. * `:proletarian/log` – a logger function that Proletarian calls whenever anything interesting happens during operation. It takes two arguments: The first is a keyword identifying the event being logged. The second is a map with data describing the event. The default logging function is simply a println-logger that will print every event using `println`. * `:proletarian/queue-worker-id` – a string identifying this Queue Worker. It is used as a thread prefix for names of threads in the thread pool. It is also added to the log event data under the key `:proletarian.worker/queue-worker-id`. The default value is computed from the name of the queue that this worker is getting jobs from. * `:proletarian/polling-interval-ms` – the time in milliseconds to wait after a job is finished before polling for a new one. The default value is 100 milliseconds. * `:proletarian/worker-threads` – the number of worker threads that work in parallel. The default value is 1. * `:proletarian/on-polling-error` – a function that Proletarian calls when a Throwable is thrown during polling for jobs. It takes one argument, the Throwable that was thrown. If it returns a truthy value, the Queue Worker is stopped. The default behavior is to stop the Queue Worker. * `:proletarian/await-termination-timeout-ms` – the time in milliseconds to wait for jobs to finish before throwing an error when shutting down the thread pool. The default value is 10000 (10 seconds). * `:proletarian/install-jvm-shutdown-hook?` – should Proletarian install a JVM shutdown hook that tries to stop the Queue Worker (using [[stop!]]) when the JVM is shut down? The default is `false`. * `:proletarian/on-shutdown` – a function that Proletarian calls after the Queue Worker has shut down successfully. It takes no arguments, and the return value is discarded. The default function is a no-op. * `:proletarian/clock` – the [[java.time.Clock]] to use for getting the current time. Used in testing. The default is [[java.time.Clock/systemUTC]].
(process-next-job! data-source queue handler-fn log config)
Gets the next job from the database table and runs it.
This function is part of the internal machinery of the Proletarian worker, but is being exposed as a public function
for use in testing scenarios and in the REPL. No default values are provided for any of the arguments or
configuration options. See the documentation for, and implementation of, create-queue-worker
for what those
default values are. It might be a good idea to create a wrapper function around this function, for use in your own
application, that provides sensible values for all the arguments and config.
data-source
– a javax.sql.DataSource
factory for creating connections to the PostgreSQL database.queue
– a keyword with the name of the queue.handler-fn
– the function that will be called when a job is pulled off the queue. It should be an arity-2
function or multimethod. The first argument is the job type (as provided to proletarian.job/enqueue!
). The
second argument is the job's payload (again, as provided to proletarian.job/enqueue!
).log
– a logger function that Proletarian calls whenever anything interesting happens during operation. It takes
two arguments: The first is a keyword identifying the event being logged. The second is a map with data
describing the event.config
– a map describing configuration options, see below.:proletarian.db/job-table
– which PostgreSQL table to write the job to.:proletarian.db/archived-job-table
– which PostgreSQL table to write archived jobs to.:proletarian.db/serializer
– an implementation of the proletarian.protocols/Serializer
protocol. You should
use the same serializer for proletarian.job/enqueue!
.:proletarian/uuid-serializer
- an implementation of the [[proletarian.protocols/UuidSerializer]] protocol. Its role is to help in the serializing and deserializing of UUIDs to accomodate various database requirements. It defaults to
proletarian.uuid.postgresql/create-serializer. A
proletarian.uuid.mysql/create-serializer`
is available if you wish to use MySQL with this library. If you override the default, you should use the same
serializer for proletarian.job/enqueue!
.:proletarian.worker/clock
– a java.time.Clock
instance to use for getting the current time.Returns true if there was a job to be run, and the current thread did not receive an interrupt while handling the job. Returns false if there was an interrupt. Returns nil if there was no job to be run.
Gets the next job from the database table and runs it. This function is part of the internal machinery of the Proletarian worker, but is being exposed as a public function for use in testing scenarios and in the REPL. No default values are provided for any of the arguments or configuration options. See the documentation for, and implementation of, [[create-queue-worker]] for what those default values are. It might be a good idea to create a wrapper function around this function, for use in your own application, that provides sensible values for all the arguments and config. ### Arguments * `data-source` – a [javax.sql.DataSource](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/javax/sql/DataSource.html) factory for creating connections to the PostgreSQL database. * `queue` – a keyword with the name of the queue. * `handler-fn` – the function that will be called when a job is pulled off the queue. It should be an arity-2 function or multimethod. The first argument is the job type (as provided to [[proletarian.job/enqueue!]]). The second argument is the job's payload (again, as provided to [[proletarian.job/enqueue!]]). * `log` – a logger function that Proletarian calls whenever anything interesting happens during operation. It takes two arguments: The first is a keyword identifying the event being logged. The second is a map with data describing the event. * `config` – a map describing configuration options, see below. ### Config * `:proletarian.db/job-table` – which PostgreSQL table to write the job to. * `:proletarian.db/archived-job-table` – which PostgreSQL table to write archived jobs to. * `:proletarian.db/serializer` – an implementation of the [[proletarian.protocols/Serializer]] protocol. You should use the same serializer for [[proletarian.job/enqueue!]]. * `:proletarian/uuid-serializer` - an implementation of the `[[proletarian.protocols/UuidSerializer]] protocol. Its role is to help in the serializing and deserializing of UUIDs to accomodate various database requirements. It defaults to `proletarian.uuid.postgresql/create-serializer`. A `proletarian.uuid.mysql/create-serializer` is available if you wish to use MySQL with this library. If you override the default, you should use the same serializer for [[proletarian.job/enqueue!]]. * `:proletarian.worker/clock` – a [java.time.Clock](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/Clock.html) instance to use for getting the current time. Returns true if there was a job to be run, and the current thread did not receive an interrupt while handling the job. Returns false if there was an interrupt. Returns nil if there was no job to be run.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close