Liking cljdoc? Tell your friends :D

com.climate.squeedo.sqs-consumer

Functions for using Amazon Simple Queueing Service to request and perform computation.

Functions for using Amazon Simple Queueing Service to request and perform
computation.
raw docstring

start-consumerclj

(start-consumer queue-name compute & opts)

Creates a consumer that reads messages as quickly as possible into a local buffer up to the configured buffer size.

Work is done asynchronously by workers controlled by the size of the work buffer (currently hardcoded to number of cpus minus 1) Bad things happen if you have workers > number of cpus.

The client has responsibility for calling stop-consumer when you no longer want to process messages. Additionally, the client MUST send the message back on the done-channel when processing is complete or if an uncaught exception occurs the message will be auto-nack'd in SQS.

Failed messages will currently not be acked, but rather go back on for redelivery after the timeout.

This code is atom free :)

Input: queue-name - the name of an sqs queue compute - a compute function that takes two args: a 'message' containing the body of the sqs message and a channel on which to ack/nack when done.

Optional arguments: :message-channel-size - the number of messages to prefetch from sqs; default 20 * num-listeners :num-workers - the number of workers processing messages concurrently :num-listeners - the number of listeners polling from sqs; default is (num-workers / 10) because each listener dequeues up to 10 messages at a time :listener-threads? - run listeners in dedicated threads; if true, will create one thread per listener :dequeue-limit - the number of messages to dequeue at a time; default 10 :max-concurrent-work - the maximum number of total messages processed. This is mainly for async workflows; default num-workers :client - the SQS client to use (if missing, sqs/mk-connection will create a client) :exceptional-poll-delay-ms - when an Exception is received while polling, the number of ms we wait until polling again. Default is 10000 (10 seconds). Output: a map with keys, :done-channel - the channel to send messages to be acked :message-channel - unused by the client.

Creates a consumer that reads messages as quickly as possible into a local buffer up
to the configured buffer size.

Work is done asynchronously by workers controlled by the size of the work buffer (currently
hardcoded to number of cpus minus 1) Bad things happen if you have workers > number of cpus.

The client has responsibility for calling stop-consumer when you no longer want to process messages.
Additionally, the client MUST send the message back on the done-channel when processing is complete
or if an uncaught exception occurs the message will be auto-nack'd in SQS.

Failed messages will currently not be acked, but rather go back on for redelivery after the timeout.

This code is atom free :)

Input:
 queue-name - the name of an sqs queue
 compute - a compute function that takes two args: a 'message' containing the body of the sqs
           message and a channel on which to ack/nack when done.

Optional arguments:
 :message-channel-size      - the number of messages to prefetch from sqs; default 20 * num-listeners
 :num-workers               - the number of workers processing messages concurrently
 :num-listeners             - the number of listeners polling from sqs; default is (num-workers / 10) because each
                              listener dequeues up to 10 messages at a time
 :listener-threads?         - run listeners in dedicated threads; if true, will create one thread per listener
 :dequeue-limit             - the number of messages to dequeue at a time; default 10
 :max-concurrent-work       - the maximum number of total messages processed.  This is mainly for async workflows;
                              default num-workers
 :client                    - the SQS client to use (if missing, sqs/mk-connection will create a client)
 :exceptional-poll-delay-ms - when an Exception is received while polling, the number of ms we wait until polling
                              again.  Default is 10000 (10 seconds).
Output:
 a map with keys, :done-channel    - the channel to send messages to be acked
                  :message-channel - unused by the client.
sourceraw docstring

stop-consumerclj

(stop-consumer {:keys [done-channel message-channel]})

Takes a consumer created by start-consumer and closes the channels. This should be called to stopped consuming messages.

Takes a consumer created by start-consumer and closes the channels.
This should be called to stopped consuming messages.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close