(create-consumer-group! conn-opts stream group)
(create-consumer-group! conn-opts stream group from-id)
An idempotent function that creates a consumer group for the stream
An idempotent function that creates a consumer group for the stream
(gc-consumer-group!
conn-opts
stream
group
&
[{:keys [rebalance dlq]
:or {rebalance {:siblings :active :distribution :random :idle (* 60 1000)}
dlq {:stream (stream-name "dlq") :deliveries 10}}
:as opts}])
(group-stats conn-opts stream group)
Useful stats about the consumer group
Useful stats about the consumer group
(next-id id)
Given a redis message id returns the next smallest possible id
Given a redis message id returns the next smallest possible id
(start-consumer! conn-opts
stream
group
consumer-name
f
&
[{:keys [block control-fn]
:or {block 5000 control-fn default-control-fn}
:as opts}])
Consumer behaviour is as follows:
stop-consumers!
)Options to the consumer consist of:
:block
ms to block waiting for a new message before checking the backlogConsumer behaviour is as follows: - Calls the callback for every message received, with the message coerced into a keywordized map, and acks the message. If the callback throws an exception the message will not be acked - Processes all pending messages on startup before processing new ones - Processes new messages until either: - The consumer is stopped (see `stop-consumers!`) - There are no messages delivered during the time it was blocked waiting for a new message, upon which it will check for pending messages and begin processing the backlog if any are found, returning to wait for new messages when the backlog is cleared Options to the consumer consist of: - `:block` ms to block waiting for a new message before checking the backlog
(stop-consumers! conn-opts)
(stop-consumers! conn-opts consumer-name-pattern)
(stop-consumers! conn-opts stream group)
Stop all the consumers for the consumer group by sending an UNBLOCK message
Stop all the consumers for the consumer group by sending an UNBLOCK message
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close