Liking cljdoc? Tell your friends :D

com.climate.squeedo.sqs-consumer

Functions for using Amazon Simple Queueing Service to request and perform computation.

Functions for using Amazon Simple Queueing Service to request and perform
computation.
raw docstring

graceful-stop-consumer!clj

(graceful-stop-consumer! consumer timeout-ms)

Takes a consumer created by start-consumer and tries to stop it. Wait at most timeout-ms until the consumer has come to a complete stop. Returns the result (:timed-out or :finished).

Takes a consumer created by start-consumer and tries to stop it.
Wait at most `timeout-ms` until the consumer has come to a complete stop.
Returns the result (:timed-out or :finished).
sourceraw docstring

graceful-stop-consumer!!clj

(graceful-stop-consumer!! consumer timeout-ms)

like graceful-stop-consumer!, but blocking way.

like graceful-stop-consumer!, but blocking way.
sourceraw docstring

start-consumerclj

(start-consumer queue-name compute & opts)

Creates a consumer that reads messages as quickly as possible into a local buffer up to the configured buffer size.

Work is done asynchronously by workers controlled by the size of the work buffer (currently hardcoded to number of cpus minus 1) Bad things happen if you have workers > number of cpus.

The client has responsibility for calling stop-consumer when you no longer want to process messages. Additionally, the client MUST send the message back on the done-channel when processing is complete or if an uncaught exception occurs the message will be auto-nack'd in SQS.

Failed messages will currently not be acked, but rather go back on for redelivery after the timeout.

Input: queue-name - the name of an sqs queue compute - a compute function that takes two args: a 'message' containing the body of the sqs message and a channel on which to ack/nack when done.

Optional arguments: :message-channel-size - the number of messages to prefetch from sqs; default 20 * num-listeners :num-workers - the number of workers processing messages concurrently :num-listeners - the number of listeners polling from sqs; default is (num-workers / 10) because each listener dequeues up to 10 messages at a time :listener-threads? - run listeners in dedicated threads; if true, will create one thread per listener :dequeue-limit - the number of messages to dequeue at a time; default 10 :max-concurrent-work - the maximum number of total messages processed. This is mainly for async workflows; default num-workers :client - the SQS client to use (if missing, sqs/mk-connection will create a client) :exceptional-poll-delay-ms - when an Exception is received while polling, the number of ms we wait until polling again. Default is 10000 (10 seconds). :worker-threads? - run workers in dedicated threads; if true, will create one thread per worker Output: a map with keys, :done-channel - the channel to send messages to be acked :message-channel - unused by the client. :ack-done-channel - the channel closes when all messages are acked.

Creates a consumer that reads messages as quickly as possible into a local buffer up
to the configured buffer size.

Work is done asynchronously by workers controlled by the size of the work buffer (currently
hardcoded to number of cpus minus 1) Bad things happen if you have workers > number of cpus.

The client has responsibility for calling stop-consumer when you no longer want to process messages.
Additionally, the client MUST send the message back on the done-channel when processing is complete
or if an uncaught exception occurs the message will be auto-nack'd in SQS.

Failed messages will currently not be acked, but rather go back on for redelivery after the timeout.

Input:
 queue-name - the name of an sqs queue
 compute - a compute function that takes two args: a 'message' containing the body of the sqs
           message and a channel on which to ack/nack when done.

Optional arguments:
 :message-channel-size      - the number of messages to prefetch from sqs; default 20 * num-listeners
 :num-workers               - the number of workers processing messages concurrently
 :num-listeners             - the number of listeners polling from sqs; default is (num-workers / 10) because each
                              listener dequeues up to 10 messages at a time
 :listener-threads?         - run listeners in dedicated threads; if true, will create one thread per listener
 :dequeue-limit             - the number of messages to dequeue at a time; default 10
 :max-concurrent-work       - the maximum number of total messages processed.  This is mainly for async workflows;
                              default num-workers
 :client                    - the SQS client to use (if missing, sqs/mk-connection will create a client)
 :exceptional-poll-delay-ms - when an Exception is received while polling, the number of ms we wait until polling
                              again.  Default is 10000 (10 seconds).
 :worker-threads?           - run workers in dedicated threads; if true, will create one thread per worker
Output:
 a map with keys, :done-channel      - the channel to send messages to be acked
                  :message-channel   - unused by the client.
                  :ack-done-channel  - the channel closes when all messages are acked.
sourceraw docstring

stop-consumerclj

(stop-consumer {:keys [ack-done-channel connection stop-listener-fn]})

Takes a consumer created by start-consumer and closes the channels. This should be called to stop consuming messages. Returns a channel that will close when all ongoing jobs are finished.

Takes a consumer created by start-consumer and closes the channels.
This should be called to stop consuming messages.
Returns a channel that will close when all ongoing jobs are finished.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close