Utility functions for working with Redis streams in carmine.
Redis does a brilliant job of being fast with loads of features and Carmine does a brilliant job of exposing all the low-level Redis commands in Clojure. Working with Redis' streams API requires quite a lot of interaction to produce desirable high-level behaviour, and that is what this library provides.
carmine-streams allows you to create streams and consumer groups, consume streams reliably, deal with failed consumers and unprocessable messages and gain visibility on the state of it all with a few simple functions.
Consistent naming conventions for streams, groups and consumers:
(require '[carmine-streams.core :as cs])
(def conn-opts {})
(def stream (cs/stream-name "sensor-readings")) ;; -> stream/sensor-readings
(def group (cs/group-name "persist-readings")) ;; -> group/persist-readings
(def consumer (cs/consumer-name "persist-readings" 0)) ;; -> consumer/persist-readings/0
A convenience function for writing Clojure maps to streams:
(car/wcar conn-opts (cs/xadd-map (cs/stream-name "maps") "*" {:foo "bar"}))
and parsing them back:
(let [[[_stream messages]] (car/wcar conn-opts (car/xread :count 1 :streams (cs/stream-name "maps") "0-0"))]
(map (fn [[_id kvs]] (cs/kvs->map kvs))
messages))
;; [{:foo "bar"}]
Idempotent consumer group creation:
(cs/create-consumer-group! conn-opts stream group)
Start an infinite loop that consumes from the group:
(def opts {:block 5000
:control-fn cs/default-control-fn})
(future
(cs/start-consumer! conn-opts
stream
group
consumer
#(println "Yum yum, tasty message" %)
opts))
Consumer behaviour is as follows:
unblock-consumers!
)Options to the consumer consist of:
:block
ms to block waiting for a new message before checking the backlog:control-fn
allows you to control the execution flow of the consumer (see below)The default control flow is as follows:
You can provide your own :control-fn
callback to change or add additional behaviour
to the consumer. The control-fn
may do whatever it pleases
but must return either :exit
or :recur
. See default-control-fn
for an example.
You should first interrupt the threads that your consumers are running on. The interrupt will be checked before each read operation and the consumer will exit gracefully.
In addition you should send an unblock message. This will allow the consumer to stop any blocking read of redis it might currently be performing in order to exit.
Sending an unblock message to blocked consumers can be done like this:
;; unblock all consumers matching consumer/*
(cs/unblock-consumers! conn-opts)
;; unblock only consumers matching consumer/persist-readings/*
(cs/unblock-consumers! conn-opts (cs/consumer-name "persist-readings"))
;; unblock all consumers of group
(cs/unblock-consumers! conn-opts stream group)
;; all stream keys matching stream/*
(cs/all-stream-keys conn-opts) ;; -> #{"stream/sensor-readings"}
;; all stream keys matching persist-*
(cs/all-stream-keys conn-opts "persist-*")
(cs/group-names conn-opts stream) ;; -> #{"group/persist-readings"}
(cs/group-stats conn-opts stream group)
{:name "group/my-group",
:consumers ({:name "consumer/my-consumer/0", :pending 1, :idle 102}
{:name "consumer/my-consumer/1", :pending 0, :idle 208}
{:name "consumer/my-consumer/2", :pending 0, :idle 311}),
:pending 1,
:last-delivered-id "0-2",
:unconsumed 0}
Garbage collect consumer groups to reallocate pending messages from dead consumers to live ones and send undeliverable messages to a Dead Letter Queue (DLQ).
When a message is not acknowledged by the consumer (i.e. your consumer died halfway through, or the callback threw an exception) it remains pending and its idle time is how long it has been since it was first read.
The two possibilities are handled differently:
:rebalance
option specifies
:idle
time necessary for a consumer/message to be considered dead before its messages are sent to another consumer:siblings
option, when :active
will apply the same test of idleness to sibling workers before claiming messages for them:distribution
option decides how to distribute work to the siblings, the choices are:
:random
random:lra
least-recently-active (with the highest idle time):mra
most-recently-active (with the lowest idle time):dlq
option specifies
:deliveries
required before the message is considered unprocessable:stream
to write the message metadata to(cs/gc-consumer-group! conn-opts stream group {:rebalance {:idle 60000
:siblings :active
:distribution :random}
:dlq {:deliveries 5
:stream "dlq"}})
;; returns
[{:action :dlq, :id "0-1", :consumer "consumer/messages/0"}
{:action :rebalance, :id "0-2", :consumer "consumer/messages/0", :claimant "consumer/messages/1"}
{:action :noop, :id "0-3", :consumer "consumer/messages/1"}]
GC behaviour is as follows:
Note that both rebalance
and dlq
criteria can specify :idle
and :deliveries
and that a message said to be exceeding the
criteria must have values exceeding one OR the other of the thresholds. By not specifying the threshold the criteria will not be compared.
You should run this function periodically, choosing values which trade off the following characteristics:
Get the next smallest message id (useful for iterating through ranges as per xrange
or xpending
:
(cs/next-id "0-1") ;; -> 0-2
Start a normal REPL. You will need redis-server v5+ running on the default port to run the tests.
Copyright © 2020 oliyh
This program and the accompanying materials are made available under the terms of the Eclipse Public License 2.0 which is available at http://www.eclipse.org/legal/epl-2.0.
This Source Code may also be made available under the following Secondary Licenses when the conditions for such availability set forth in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version, with the GNU Classpath Exception which is available at https://www.gnu.org/software/classpath/license.html.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close