Liking cljdoc? Tell your friends :D

gluttony.core


start-consumerclj

(start-consumer queue-url consume & [opts])

Creates a consumer and run it.

The consumer mainly run two type of processes based on core.async/go-loop.

  1. Receiver Receiver receives messages from AWS SQS and push them into a local buffer, core.async/channel, as quickly as possible up to the configured buffer size.

  2. Worker Worker takes a message which the receivers push and invokes the consume function which you pass as the argument.

The client MUST call stop-consumer when you no longer want to process messages.

Input: queue-url - the url of an AWS SQS queue used as the producer. consume - consume is a function that takes three arguments: 1. a 'message' is a record of gluttony.record.message/SQSMessage which contains the body of the sqs message. 2. a function when it MUST be invoked when the consume success. it takes no arguments. 3. a function when it MUST be invoked when the consume fail. it takes zero or one argument. the argument decides how long to delay in seconds for retrying. 0 to 43200. Maximum: 12 hours. when you pass no delay time, retrying as soon as possible. like this: (defn consume [message respond raise] (let [success? (...your computation uses the message...)] (if success? (respond) (raise 10))))

Optional arguments: :client - the SQS client, which is the instance of cognitect.aws.client.Client. if missing, cognitect.aws.client.api/client would be called. :num-workers - the number of workers processing messages concurrently. default: (Runtime/availableProcessors) - 1 :num-receivers - the number of receivers polling from sqs. default: (num-workers / 10) because each receiver is able to receive up to 10 messages at a time. :message-channel-size - the number of messages to prefetch from sqs. default: 20 * num-receivers :receive-limit - the number of messages to receive at a time. 1 to 10. default: 10 :consume-limit - the number of processing messages at the same time. 0 to 1024 If the consume run asynchronously, for instance inside go block, you may want to use this option. default: 0, which means gluttony doesn't care about how many message are processed simultaneously. :long-polling-duration - the duration (in seconds) for which the call waits for a message to arrive in the queue before returning. 0 to 20. default: 20 :exceptional-poll-delay-ms - when an Exception is received while polling, receiver wait for the number of ms until polling again. default: 10000 (10 seconds). :heartbeat - the duration (in seconds) for which the consumer extends message visibility if the message is being processed. 1 to 43199. default: nil If it isn't set, heartbeat doesn't work. If it's set, :heartbeat-timeout is required. Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html If you set this option and :consume-limit, recommend to make :consume-limit bigger than :message-channel-size so as not to block heartbeat requests. :heartbeat-timeout - the timeout (in seconds) of heartbeat. If your consume function doesn't call respond or raise within heartbeat timeout, the consumer doesn't extend message visibility any more. 2 to 43200. default: nil :heartbeat-timeout must be longer than :heartbeat. Output: a instance of gluttony.record.consumer.Consumer

Creates a consumer and run it.

The consumer mainly run two type of processes based on core.async/go-loop.

1. Receiver
Receiver receives messages from AWS SQS and push them into a local buffer, core.async/channel,
as quickly as possible up to the configured buffer size.

2. Worker
Worker takes a message which the receivers push and invokes the consume function which you pass
as the argument.

The client MUST call stop-consumer when you no longer want to process messages.

Input:
 queue-url - the url of an AWS SQS queue used as the producer.
 consume - consume is a function that takes three arguments:
           1. a 'message' is a record of gluttony.record.message/SQSMessage which contains the
           body of the sqs message.
           2. a function when it MUST be invoked when the consume success. it takes no arguments.
           3. a function when it MUST be invoked when the consume fail. it takes zero or one
           argument. the argument decides how long to delay in seconds for retrying. 0 to 43200.
           Maximum: 12 hours. when you pass no delay time, retrying as soon as possible.
           like this:
           (defn consume [message respond raise]
             (let [success? (...your computation uses the message...)]
               (if success?
                 (respond)
                 (raise 10))))

Optional arguments:
 :client                    - the SQS client, which is the instance of cognitect.aws.client.Client.
                              if missing, cognitect.aws.client.api/client would be called.
 :num-workers               - the number of workers processing messages concurrently.
                              default: (Runtime/availableProcessors) - 1
 :num-receivers             - the number of receivers polling from sqs.
                              default: (num-workers / 10) because each receiver is able to receive
                                       up to 10 messages at a time.
 :message-channel-size      - the number of messages to prefetch from sqs.
                              default: 20 * num-receivers
 :receive-limit             - the number of messages to receive at a time. 1 to 10.
                              default: 10
 :consume-limit             - the number of processing messages at the same time. 0 to 1024
                              If the consume run asynchronously, for instance inside go block,
                              you may want to use this option.
                              default: 0, which means gluttony doesn't care about how many message
                              are processed simultaneously.
 :long-polling-duration     - the duration (in seconds) for which the call waits for a message to
                              arrive in the queue before returning. 0 to 20.
                              default: 20
 :exceptional-poll-delay-ms - when an Exception is received while polling, receiver wait for the
                              number of ms until polling again.
                              default: 10000 (10 seconds).
 :heartbeat                 - the duration (in seconds) for which the consumer extends message
                              visibility if the message is being processed. 1 to 43199.
                              default: nil
                              If it isn't set, heartbeat doesn't work.
                              If it's set, :heartbeat-timeout is required.
                              Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html
                              If you set this option and :consume-limit, recommend to make
                              :consume-limit bigger than :message-channel-size so as not to block
                              heartbeat requests.
 :heartbeat-timeout         - the timeout (in seconds) of heartbeat.
                              If your consume function doesn't call respond or raise within heartbeat
                              timeout, the consumer doesn't extend message visibility any more.
                              2 to 43200.
                              default: nil
                              :heartbeat-timeout must be longer than :heartbeat.
Output:
 a instance of gluttony.record.consumer.Consumer
sourceraw docstring

stop-consumerclj

(stop-consumer consumer)

Takes a consumer created by start-consumer and stop all processes and the client if it is created in start-consumer. This should be called to stopped consuming messages.

Takes a consumer created by start-consumer and stop all processes and the client if it is created
in start-consumer. This should be called to stopped consuming messages.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close