Liking cljdoc? Tell your friends :D

proletarian.worker


create-queue-workerclj

(create-queue-worker data-source handler-fn)
(create-queue-worker
  data-source
  handler-fn
  {:proletarian/keys
     [queue job-table archived-job-table serializer uuid-serializer log
      handler-fn-mode retry-strategy-fn failed-job-fn queue-worker-id
      polling-interval-ms worker-threads on-polling-error
      await-termination-timeout-ms install-jvm-shutdown-hook? on-shutdown clock]
   :or {clock (Clock/systemUTC)
        on-shutdown (fn [])
        log log/println-logger
        await-termination-timeout-ms 10000
        polling-interval-ms 100
        archived-job-table db/DEFAULT_ARCHIVED_JOB_TABLE
        install-jvm-shutdown-hook? false
        uuid-serializer (pg-uuid/create-serializer)
        queue db/DEFAULT_QUEUE
        failed-job-fn (constantly nil)
        serializer (transit/create-serializer)
        retry-strategy-fn (constantly nil)
        handler-fn-mode :default
        worker-threads 1
        on-polling-error (constantly true)
        job-table db/DEFAULT_JOB_TABLE}})

Create and return a Queue Worker, which is an instance of proletarian.protocols/QueueWorker. After creation, the Queue Worker must be started using start!, and can be stopped using stop!.

Arguments

  • data-source – a javax.sql.DataSource factory for creating connections to the PostgreSQL database.
  • handler-fn – the function that will be called when a job is pulled off the queue. By default this should be an arity-2 function or multimethod. The first argument is the job type (as provided to proletarian.job/enqueue!). The second argument is the job's payload (again, as provided to proletarian.job/enqueue!). This mode of calling the handler-fn can be changed with the :proletarian/handler-fn-mode, see below.
  • options – an optional map describing configuration options, see below.

Options

The optional third argument is an options map with the following keys, all optional with default values:

  • :proletarian/queue – a keyword with the name of the queue. The default value is :proletarian/default.
  • :proletarian/job-table – which PostgreSQL table to write the job to. The default is proletarian.job. You should only have to override this if you changed the default table name during installation.
  • :proletarian/archived-job-table – which PostgreSQL table to write archived jobs to. The default is proletarian.archived_job. You should only have to override this if you changed the default table name during installation.
  • :proletarian/serializer – an implementation of the proletarian.protocols/Serializer protocol. The default is a Transit serializer (see proletarian.transit/create-serializer). If you override this, you should use the same serializer for proletarian.job/enqueue!.
  • :proletarian/handler-fn-mode – a keyword that specifies how the handler-fn will be called. The default is :default. Possible values:
    • :default – Call the handler-fn with two arguments, the job type and the payload (see handler-fn above)
    • :advanced - Call the handler-fn with one argument, a map with the job's attributes: :proletarian.job/job-type, :proletarian.job/payload, :proletarian.job/job-id, :proletarian.job/queue, :proletarian.job/enqueued-at, :proletarian.job/process-at and :proletarian.job/attempts.
  • :proletarian/retry-strategy-fn – a function that will be called to provide the retry strategy for a job if it fails. It should be an arity-2 function or multimethod. The first argument is a map with the job's attributes: :proletarian.job/job-type, :proletarian.job/payload, :proletarian.job/job-id, :proletarian.job/queue, :proletarian.job/enqueued-at, :proletarian.job/process-at and :proletarian.job/attempts. The second argument is the exception that caused the job to fail. It should return a map that specifies the retry strategy.
  • :proletarian/failed-job-fn – a function that will be called when a job has failed after retries. It should be an arity-2 function or multimethod. The first argument is a map with the job's attributes (see :proletarian/retry-strategy-fn). The second argument is the exception that caused the job to fail the last time.
  • :proletarian/log – a logger function that Proletarian calls whenever anything interesting happens during operation. It takes two arguments: The first is a keyword identifying the event being logged. The second is a map with data describing the event. The default logging function is simply a println-logger that will print every event using println.
  • :proletarian/queue-worker-id – a string identifying this Queue Worker. It is used as a thread prefix for names of threads in the thread pool. It is also added to the log event data under the key :proletarian.worker/queue-worker-id. The default value is computed from the name of the queue that this worker is getting jobs from.
  • :proletarian/polling-interval-ms – the time in milliseconds to wait after a job is finished before polling for a new one. The default value is 100 milliseconds.
  • :proletarian/worker-threads – the number of worker threads that work in parallel. The default value is 1.
  • :proletarian/on-polling-error – a function that Proletarian calls when a Throwable is thrown during polling for jobs. It takes one argument, the Throwable that was thrown. If it returns a truthy value, the Queue Worker is stopped. The default behavior is to stop the Queue Worker.
  • :proletarian/await-termination-timeout-ms – the time in milliseconds to wait for jobs to finish before throwing an error when shutting down the thread pool. The default value is 10000 (10 seconds).
  • :proletarian/install-jvm-shutdown-hook? – should Proletarian install a JVM shutdown hook that tries to stop the Queue Worker (using stop!) when the JVM is shut down? The default is false.
  • :proletarian/on-shutdown – a function that Proletarian calls after the Queue Worker has shut down successfully. It takes no arguments, and the return value is discarded. The default function is a no-op.
  • :proletarian/clock – the [[java.time.Clock]] to use for getting the current time. Used in testing. The default is [[java.time.Clock/systemUTC]].
Create and return a Queue Worker, which is an instance of [[proletarian.protocols/QueueWorker]]. After creation, the
Queue Worker must be started using [[start!]], and can be stopped using [[stop!]].

### Arguments
* `data-source` – a [javax.sql.DataSource](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/javax/sql/DataSource.html)
   factory for creating connections to the PostgreSQL database.
* `handler-fn` – the function that will be called when a job is pulled off the queue. By default this should be an
   arity-2 function or multimethod. The first argument is the job type (as provided to [[proletarian.job/enqueue!]]).
   The second argument is the job's payload (again, as provided to [[proletarian.job/enqueue!]]). This mode of
   calling the handler-fn can be changed with the `:proletarian/handler-fn-mode`, see below.
* `options` – an optional map describing configuration options, see below.

### Options
The optional third argument is an options map with the following keys, all optional with default values:
* `:proletarian/queue` – a keyword with the name of the queue. The default value is `:proletarian/default`.
* `:proletarian/job-table` – which PostgreSQL table to write the job to. The default is `proletarian.job`. You should
    only have to override this if you changed the default table name during installation.
* `:proletarian/archived-job-table` – which PostgreSQL table to write archived jobs to. The default is
    `proletarian.archived_job`. You should only have to override this if you changed the default table name during
    installation.
* `:proletarian/serializer` – an implementation of the [[proletarian.protocols/Serializer]] protocol. The default is
    a Transit serializer (see [[proletarian.transit/create-serializer]]). If you override this, you should use the
    same serializer for [[proletarian.job/enqueue!]].
* `:proletarian/handler-fn-mode` – a keyword that specifies how the `handler-fn` will be called. The default is
    `:default`. Possible values:
    - `:default` – Call the `handler-fn` with two arguments, the job type and the payload (see `handler-fn` above)
    - `:advanced` - Call the `handler-fn` with one argument, a map with the job's attributes:
        `:proletarian.job/job-type`, `:proletarian.job/payload`, `:proletarian.job/job-id`, `:proletarian.job/queue`,
        `:proletarian.job/enqueued-at`, `:proletarian.job/process-at` and `:proletarian.job/attempts`.
* `:proletarian/retry-strategy-fn` – a function that will be called to provide the __retry strategy__ for a job if it
    fails. It should be an arity-2 function or multimethod. The first argument is a map with the job's attributes:
    `:proletarian.job/job-type`, `:proletarian.job/payload`, `:proletarian.job/job-id`, `:proletarian.job/queue`,
    `:proletarian.job/enqueued-at`, `:proletarian.job/process-at` and `:proletarian.job/attempts`. The second
    argument is the exception that caused the job to fail. It should return a map that specifies the
    [retry strategy](/readme#retries).
* `:proletarian/failed-job-fn` – a function that will be called when a job has failed after retries. It should be an
    arity-2 function or multimethod. The first argument is a map with the job's attributes (see
    `:proletarian/retry-strategy-fn`). The second argument is the exception that caused the job to fail the last
    time.
* `:proletarian/log` – a logger function that Proletarian calls whenever anything interesting happens during
    operation. It takes two arguments: The first is a keyword identifying the event being logged. The second is a map
    with data describing the event. The default logging function is simply a println-logger that will print
    every event using `println`.
* `:proletarian/queue-worker-id` – a string identifying this Queue Worker. It is used as a thread prefix for names of
    threads in the thread pool. It is also added to the log event data under the key
    `:proletarian.worker/queue-worker-id`. The default value is computed from the name of the queue that this worker
    is getting jobs from.
* `:proletarian/polling-interval-ms` – the time in milliseconds to wait after a job is finished before polling for a
    new one. The default value is 100 milliseconds.
* `:proletarian/worker-threads` – the number of worker threads that work in parallel. The default value is 1.
* `:proletarian/on-polling-error` – a function that Proletarian calls when a Throwable is thrown during polling for
    jobs. It takes one argument, the Throwable that was thrown. If it returns a truthy value, the Queue Worker is
    stopped. The default behavior is to stop the Queue Worker.
* `:proletarian/await-termination-timeout-ms` – the time in milliseconds to wait for jobs to finish before throwing
    an error when shutting down the thread pool. The default value is 10000 (10 seconds).
* `:proletarian/install-jvm-shutdown-hook?` – should Proletarian install a JVM shutdown hook that tries to stop the
    Queue Worker (using [[stop!]]) when the JVM is shut down? The default is `false`.
* `:proletarian/on-shutdown` – a function that Proletarian calls after the Queue Worker has shut down successfully.
    It takes no arguments, and the return value is discarded. The default function is a no-op.
* `:proletarian/clock` – the [[java.time.Clock]] to use for getting the current time. Used in testing. The default is
    [[java.time.Clock/systemUTC]].
sourceraw docstring

process-next-job!clj

(process-next-job! data-source queue handler-fn log config)

Gets the next job from the database table and runs it.

This function is part of the internal machinery of the Proletarian worker, but is being exposed as a public function for use in testing scenarios and in the REPL. No default values are provided for any of the arguments or configuration options. See the documentation for, and implementation of, create-queue-worker for what those default values are. It might be a good idea to create a wrapper function around this function, for use in your own application, that provides sensible values for all the arguments and config.

Arguments

  • data-source – a javax.sql.DataSource factory for creating connections to the PostgreSQL database.
  • queue – a keyword with the name of the queue.
  • handler-fn – the function that will be called when a job is pulled off the queue. It should be an arity-2 function or multimethod. The first argument is the job type (as provided to proletarian.job/enqueue!). The second argument is the job's payload (again, as provided to proletarian.job/enqueue!).
  • log – a logger function that Proletarian calls whenever anything interesting happens during operation. It takes two arguments: The first is a keyword identifying the event being logged. The second is a map with data describing the event.
  • config – a map describing configuration options, see below.

Config

  • :proletarian.db/job-table – which PostgreSQL table to write the job to.
  • :proletarian.db/archived-job-table – which PostgreSQL table to write archived jobs to.
  • :proletarian.db/serializer – an implementation of the proletarian.protocols/Serializer protocol. You should use the same serializer for proletarian.job/enqueue!.
  • :proletarian/uuid-serializer - an implementation of the [[proletarian.protocols/UuidSerializer]] protocol. Its role is to help in the serializing and deserializing of UUIDs to accomodate various database requirements. It defaults toproletarian.uuid.postgresql/create-serializer. Aproletarian.uuid.mysql/create-serializer` is available if you wish to use MySQL with this library. If you override the default, you should use the same serializer for proletarian.job/enqueue!.
  • :proletarian.worker/clock – a java.time.Clock instance to use for getting the current time.

Returns true if there was a job to be run, and the current thread did not receive an interrupt while handling the job. Returns false if there was an interrupt. Returns nil if there was no job to be run.

Gets the next job from the database table and runs it.

This function is part of the internal machinery of the Proletarian worker, but is being exposed as a public function
for use in testing scenarios and in the REPL. No default values are provided for any of the arguments or
configuration options. See the documentation for, and implementation of, [[create-queue-worker]] for what those
default values are. It might be a good idea to create a wrapper function around this function, for use in your own
application, that provides sensible values for all the arguments and config.

### Arguments
* `data-source` – a [javax.sql.DataSource](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/javax/sql/DataSource.html)
    factory for creating connections to the PostgreSQL database.
* `queue` – a keyword with the name of the queue.
* `handler-fn` – the function that will be called when a job is pulled off the queue. It should be an arity-2
    function or multimethod. The first argument is the job type (as provided to [[proletarian.job/enqueue!]]). The
    second argument is the job's payload (again, as provided to [[proletarian.job/enqueue!]]).
* `log` – a logger function that Proletarian calls whenever anything interesting happens during operation. It takes
    two arguments: The first is a keyword identifying the event being logged. The second is a map with data
    describing the event.
* `config` – a map describing configuration options, see below.

### Config
* `:proletarian.db/job-table` – which PostgreSQL table to write the job to.
* `:proletarian.db/archived-job-table` – which PostgreSQL table to write archived jobs to.
* `:proletarian.db/serializer` – an implementation of the [[proletarian.protocols/Serializer]] protocol. You should
    use the same serializer for [[proletarian.job/enqueue!]].
* `:proletarian/uuid-serializer` - an implementation of the `[[proletarian.protocols/UuidSerializer]] protocol.
    Its role is to help in the serializing and deserializing of UUIDs to accomodate various database
    requirements. It defaults to `proletarian.uuid.postgresql/create-serializer`. A `proletarian.uuid.mysql/create-serializer`
    is available if you wish to use MySQL with this library. If you override the default, you should use the same
    serializer for [[proletarian.job/enqueue!]].
* `:proletarian.worker/clock` – a [java.time.Clock](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/Clock.html)
    instance to use for getting the current time.

Returns true if there was a job to be run, and the current thread did not receive an interrupt while handling the
job. Returns false if there was an interrupt.
Returns nil if there was no job to be run.
sourceraw docstring

start!clj

(start! queue-worker)

Start the Queue Worker.

Start the Queue Worker.
sourceraw docstring

stop!clj

(stop! queue-worker)

Stop the Queue Worker.

Stop the Queue Worker.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close