Liking cljdoc? Tell your friends :D

carmine-streams.core


all-stream-keysclj

(all-stream-keys conn-opts)
(all-stream-keys conn-opts key-pattern)
source

clear-pending!clj

(clear-pending! conn-opts stream group)
(clear-pending! conn-opts stream group consumer-name)
source

consumer-nameclj

(consumer-name s)
(consumer-name s i)
source

create-consumer-group!clj

(create-consumer-group! conn-opts streams group)
(create-consumer-group! conn-opts
                        streams
                        group
                        from-id
                        &
                        [{:keys [deregister-idle]
                          :or {deregister-idle (* 10 60 1000)}}])

An idempotent function that creates a consumer group for the stream(s) and deregisters idle consumers.

Idle-time threshold for when a consumer is considered dead can be configured like so: (create-consumer-group! conn-opts stream group from-id {:deregister-idle milliseconds-idle-threshold})

An idempotent function that creates a consumer group for the
stream(s) and deregisters idle consumers.

Idle-time threshold for when a consumer is considered dead can be
configured like so:
`(create-consumer-group!
  conn-opts stream group from-id
  {:deregister-idle milliseconds-idle-threshold})`
sourceraw docstring

default-control-fnclj

(default-control-fn phase context value & [id kvs])

The default control flow for consumers. Must return either :recur (to read the next message) or :exit to exit the loop. May have any side effects you need. Exits when unblocked via unblock-consumers! or any other error reading from redis. Recurs in all other scenarios.

The default control flow for consumers.
Must return either `:recur` (to read the next message) or `:exit` to exit the loop.
May have any side effects you need.
Exits when unblocked via `unblock-consumers!` or any other error reading from redis.
Recurs in all other scenarios.
sourceraw docstring

group-nameclj

(group-name s)
source

group-name->delivery-counts-keyclj

(group-name->delivery-counts-key consumer)
source

group-namesclj

(group-names conn-opts stream)
source

group-statsclj

(group-stats conn-opts stream group)

Useful stats about the consumer group

Useful stats about the consumer group
sourceraw docstring

kvs->mapclj

(kvs->map kvs)
source

next-idclj

(next-id id)

Given a redis message id returns the next smallest possible id

Given a redis message id returns the next smallest possible id
sourceraw docstring

prev-idclj

(prev-id id)

Given a redis message id returns the previous largest possible id

Given a redis message id returns the previous largest possible id
sourceraw docstring

start-multi-consumer!clj

(start-multi-consumer!
  conn-opts
  streams
  group
  consumer-name
  f
  &
  [{:keys [block control-fn]
    :or {block 5000 control-fn default-control-fn}
    {:keys [min-idle-time] :or {min-idle-time (* 60 1000)} :as claim-opts}
      :claim-opts}])

Starts a consumer that listens for messages on one or more streams, processing them mostly in priority order. A message is considered higher priority if its stream appears earlier in the list of streams passed to this function.

A single consumer will process messages completely in priority order unless there is an error while processing. It will then not process the message that failed until after it has processed one message of the next lowest priority (unless no messages arrive while it is blocked waiting to receive one, in which case it will retry the failed message).

Consumer behaviour is as follows:

  • Calls the callback for every message received, with the message coerced into a keywordized map, and acks the message. If the callback throws an exception the message will not be acked
  • Processes all pending messages for a given stream on startup before processing new ones. Processes new high priority messages before processing pending low priority ones.
  • If there is only one stream receiving new messages, then the consumer processes new messages until either:
    • It is explicitly unblocked (see unblock-consumers!)
    • There are no messages delivered during the time it was blocked waiting or there is a higher priority stream that may have pending messages, upon which it will check for pending messages and begin processing the backlog if any are found, returning to wait for new messages when the backlog is cleared

When checking for pending messages, if it has been sufficiently long since the last check, it will check for idle messages on the backlog of other consumers and claim them, or putting messages on the dlq if they have been retried too many times. This ensures that even if a consumer dies, its messages will still get processed.

The streams parameter can be a stream or a sequence of streams.

Options consist of:

  • :block ms to block waiting for a new message when there are no pending messages on any of the streams

  • :control-fn a function for controlling the flow of operation, see default-control-fn

  • :claim-opts an options map for configuring how messages are claimed from other consumers. Available claim options are:

    • :min-idle-time the minimum time (ms) a message has to be idle before it can be claimed. Also the minimum amount of time between checking for abandoned messages
    • :max-deliveries the maximum number of times a message should be delivered (attempted to be processed) before it is put in the dlq
    • :message-rescue-count the number of message to attempt to claim in one go
    • :dlq dead letter queue options map. Options are:
      • :stream the stream to which poison messages are added
      • :include-message? set this to false if you don't want to include original message content in the dlq message
Starts a consumer that listens for messages on one or more streams,
processing them mostly in priority order. A message is considered
higher priority if its stream appears earlier in the list of streams
passed to this function.

A single consumer will process messages completely in priority order
unless there is an error while processing. It will then not process
the message that failed until after it has processed one message of
the next lowest priority (unless no messages arrive while it is
blocked waiting to receive one, in which case it will retry the
failed message).


Consumer behaviour is as follows:

- Calls the callback for every message received, with the message
  coerced into a keywordized map, and acks the message.
  If the callback throws an exception the message will not be acked
- Processes all pending messages for a given stream on startup before
  processing new ones. Processes new high priority messages before
  processing pending low priority ones.
- If there is only one stream receiving new messages, then the consumer
  processes new messages until either:
  - It is explicitly unblocked (see `unblock-consumers!`)
  - There are no messages delivered during the time it was blocked
    waiting or there is a higher priority stream that may have pending
    messages, upon which it will check for pending messages and begin
    processing the backlog if any are found, returning to wait for new
    messages when the backlog is cleared

When checking for pending messages, if it has been sufficiently long
since the last check, it will check for idle messages on the backlog
of other consumers and claim them, or putting messages on the dlq if
they have been retried too many times. This ensures that even if a
consumer dies, its messages will still get processed.

The `streams` parameter can be a stream or a sequence of streams.

Options consist of:

- `:block` ms to block waiting for a new message when there are no
  pending messages on any of the streams
- `:control-fn` a function for controlling the flow of operation, see `default-control-fn`

- `:claim-opts` an options map for configuring how messages are
  claimed from other consumers. Available claim options are:
  - `:min-idle-time` the minimum time (ms) a message has to be idle
    before it can be claimed. Also the minimum amount of time between
    checking for abandoned messages
  - `:max-deliveries` the maximum number of times a message should
    be delivered (attempted to be processed) before it is put in
    the dlq
  - `:message-rescue-count` the number of message to attempt to
    claim in one go
  - `:dlq` dead letter queue options map. Options are:
    - `:stream` the stream to which poison messages are added
    - `:include-message?` set this to false if you don't want
      to include original message content in the dlq message
sourceraw docstring

stream-nameclj

(stream-name s)
source

unblock-consumers!clj

(unblock-consumers! conn-opts)
(unblock-consumers! conn-opts consumer-name-pattern)
(unblock-consumers! conn-opts stream group)

Unblock all the consumers for the consumer group by sending an UNBLOCK message. The default control-fn will terminate the consumer loop

Unblock all the consumers for the consumer group by sending an UNBLOCK message.
The default control-fn will terminate the consumer loop
sourceraw docstring

unblocked?clj

(unblocked? v)
source

xadd-mapclj

(xadd-map & args)
source

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close