(create-consumer-group! conn-opts stream group)
(create-consumer-group! conn-opts stream group from-id)
An idempotent function that creates a consumer group for the stream
An idempotent function that creates a consumer group for the stream
(default-control-fn phase context value & [id kvs])
The default control flow for consumers.
Must return either :recur
(to read the next message) or :exit
to exit the loop.
May have any side effects you need.
Exits when unblocked via unblock-consumers!
or any other error reading from redis.
Recurs in all other scenarios.
The default control flow for consumers. Must return either `:recur` (to read the next message) or `:exit` to exit the loop. May have any side effects you need. Exits when unblocked via `unblock-consumers!` or any other error reading from redis. Recurs in all other scenarios.
(gc-consumer-group!
conn-opts
stream
group
&
[{:keys [rebalance dlq]
:or {rebalance {:siblings :active :distribution :random :idle (* 60 1000)}
dlq {:stream (stream-name "dlq") :deliveries 10}}
:as opts}])
(group-stats conn-opts stream group)
Useful stats about the consumer group
Useful stats about the consumer group
(next-id id)
Given a redis message id returns the next smallest possible id
Given a redis message id returns the next smallest possible id
(start-consumer! conn-opts
stream
group
consumer-name
f
&
[{:keys [block control-fn]
:or {block 5000 control-fn default-control-fn}
:as opts}])
Consumer behaviour is as follows:
unblock-consumers!
)Options to the consumer consist of:
:block
ms to block waiting for a new message before checking the backlog:control-fn
a function for controlling the flow of operation, see default-control-fn
Consumer behaviour is as follows: - Calls the callback for every message received, with the message coerced into a keywordized map, and acks the message. If the callback throws an exception the message will not be acked - Processes all pending messages on startup before processing new ones - Processes new messages until either: - The consumer is explicitly unblocked (see `unblock-consumers!`) - There are no messages delivered during the time it was blocked waiting for a new message, upon which it will check for pending messages and begin processing the backlog if any are found, returning to wait for new messages when the backlog is cleared Options to the consumer consist of: - `:block` ms to block waiting for a new message before checking the backlog - `:control-fn` a function for controlling the flow of operation, see `default-control-fn`
(unblock-consumers! conn-opts)
(unblock-consumers! conn-opts consumer-name-pattern)
(unblock-consumers! conn-opts stream group)
Unblock all the consumers for the consumer group by sending an UNBLOCK message. The default control-fn will terminate the consumer loop
Unblock all the consumers for the consumer group by sending an UNBLOCK message. The default control-fn will terminate the consumer loop
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close