Liking cljdoc? Tell your friends :D

gluttony.core


start-consumerclj

(start-consumer queue-url consume & [opts])

Creates a consumer and run it.

The consumer mainly run two type of processes based on core.async/go-loop.

  1. Receiver Receiver receives messages from AWS SQS and push them into a local buffer, core.async/channel, as quickly as possible up to the configured buffer size.

  2. Worker Worker takes a message which the receivers push and invokes the consume function which you pass as the argument.

The client MUST call stop-consumer when you no longer want to process messages.

Input: queue-url - the url of an AWS SQS queue used as the producer. consume - consume is a function that takes three arguments: 1. a 'message' is a record of gluttony.record.message/SQSMessage which contains the body of the sqs message. 2. a function when it MUST be invoked when the consume success. it takes no arguments. 3. a function when it MUST be invoked when the consume fail. it takes zero or one argument. the argument decides how long to delay in seconds for retrying. 0 to 43200. Maximum: 12 hours. when you pass no delay time, retrying as soon as possible. like this: (defn consume [message respond raise] (let [success? (...your computation uses the message...)] (if success? (respond) (raise 10))))

Optional arguments: :client - the SQS client, which is the instance of cognitect.aws.client.Client. if missing, cognitect.aws.client.api/client would be called. :num-workers - the number of workers processing messages concurrently. default: (Runtime/availableProcessors) - 1 :num-receivers - the number of receivers polling from sqs. default: (num-workers / 10) because each receiver is able to receive up to 10 messages at a time. :message-channel-size - the number of messages to prefetch from sqs. default: 20 * num-receivers :receive-limit - the number of messages to receive at a time. 1 to 10. default: 10 :consume-limit - the number of processing messages at the same time. 0 to 1024 If the consume run asynchronously, for instance inside go block, you may want to use this option. default: 0, which means gluttony doesn't care about how many message are processed simultaneously. :long-polling-duration - the duration (in seconds) for which the call waits for a message to arrive in the queue before returning. 0 to 20. default: 20 :exceptional-poll-delay-ms - when an Exception is received while polling, receiver wait for the number of ms until polling again. default: 10000 (10 seconds). :heartbeat - the duration (in seconds) for which the consumer extends message visibility if the message is being processed. 1 to 43199. default: nil If it isn't set, heartbeat doesn't work. If it's set, :heartbeat-timeout is required. Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html If you set this option and :consume-limit, recommend to make :consume-limit bigger than :message-channel-size so as not to block heartbeat requests. :heartbeat-timeout - the timeout (in seconds) of heartbeat. If your consume function doesn't call respond or raise within heartbeat timeout, the consumer doesn't extend message visibility any more. 2 to 43200. default: nil :heartbeat-timeout must be longer than :heartbeat. :visibility-timeout-in-heartbeat - control visibility timeout (in seconds) in heartbeat. default: :heartbeat + 1 :visibility-timeout-in-heartbeat must be longer than :heartbeat. Output: a instance of gluttony.record.consumer.Consumer

Creates a consumer and run it.

The consumer mainly run two type of processes based on core.async/go-loop.

1. Receiver
Receiver receives messages from AWS SQS and push them into a local buffer, core.async/channel,
as quickly as possible up to the configured buffer size.

2. Worker
Worker takes a message which the receivers push and invokes the consume function which you pass
as the argument.

The client MUST call stop-consumer when you no longer want to process messages.

Input:
 queue-url - the url of an AWS SQS queue used as the producer.
 consume - consume is a function that takes three arguments:
           1. a 'message' is a record of gluttony.record.message/SQSMessage which contains the
           body of the sqs message.
           2. a function when it MUST be invoked when the consume success. it takes no arguments.
           3. a function when it MUST be invoked when the consume fail. it takes zero or one
           argument. the argument decides how long to delay in seconds for retrying. 0 to 43200.
           Maximum: 12 hours. when you pass no delay time, retrying as soon as possible.
           like this:
           (defn consume [message respond raise]
             (let [success? (...your computation uses the message...)]
               (if success?
                 (respond)
                 (raise 10))))

Optional arguments:
 :client                          - the SQS client, which is the instance of cognitect.aws.client.Client.
                                    if missing, cognitect.aws.client.api/client would be called.
 :num-workers                     - the number of workers processing messages concurrently.
                                    default: (Runtime/availableProcessors) - 1
 :num-receivers                   - the number of receivers polling from sqs.
                                    default: (num-workers / 10) because each receiver is able to receive
                                             up to 10 messages at a time.
 :message-channel-size            - the number of messages to prefetch from sqs.
                                    default: 20 * num-receivers
 :receive-limit                   - the number of messages to receive at a time. 1 to 10.
                                    default: 10
 :consume-limit                   - the number of processing messages at the same time. 0 to 1024
                                    If the consume run asynchronously, for instance inside go block,
                                    you may want to use this option.
                                    default: 0, which means gluttony doesn't care about how many message
                                    are processed simultaneously.
 :long-polling-duration           - the duration (in seconds) for which the call waits for a message to
                                    arrive in the queue before returning. 0 to 20.
                                    default: 20
 :exceptional-poll-delay-ms       - when an Exception is received while polling, receiver wait for the
                                    number of ms until polling again.
                                    default: 10000 (10 seconds).
 :heartbeat                       - the duration (in seconds) for which the consumer extends message
                                    visibility if the message is being processed. 1 to 43199.
                                    default: nil
                                    If it isn't set, heartbeat doesn't work.
                                    If it's set, :heartbeat-timeout is required.
                                    Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html
                                    If you set this option and :consume-limit, recommend to make
                                    :consume-limit bigger than :message-channel-size so as not to block
                                    heartbeat requests.
 :heartbeat-timeout               - the timeout (in seconds) of heartbeat.
                                    If your consume function doesn't call respond or raise within heartbeat
                                    timeout, the consumer doesn't extend message visibility any more.
                                    2 to 43200.
                                    default: nil
                                    :heartbeat-timeout must be longer than :heartbeat.
 :visibility-timeout-in-heartbeat - control visibility timeout (in seconds) in heartbeat.
                                    default: :heartbeat + 1
                                    :visibility-timeout-in-heartbeat must be longer than :heartbeat.
Output:
 a instance of gluttony.record.consumer.Consumer
sourceraw docstring

start-receiversclj

(start-receivers consumer)

Start fetching messages from AWS SQS.

Start fetching messages from AWS SQS.
sourceraw docstring

stop-consumerclj

(stop-consumer consumer)

Takes a consumer created by start-consumer and stop all processes and the client if it is created in start-consumer. This should be called to stopped consuming messages.

Takes a consumer created by start-consumer and stop all processes and the client if it is created
in start-consumer. This should be called to stopped consuming messages.
sourceraw docstring

stop-receiversclj

(stop-receivers consumer)

Stop fetching messages from AWS SQS. WARN: Receivers during long-polling are not canceled.

Stop fetching messages from AWS SQS.
WARN: Receivers during long-polling are not canceled.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close