core.async sugar for k7 queues
core.async sugar for k7 queues
(consumer-group-chan q group-id & {:keys [ch poll-opts cg-opts]})Open a ConsumerGroup on queue q for group-id and start a dedicated reader thread that polls and delivers Msg values to a channel.
The ConsumerGroup is owned entirely by the reader thread — poll! and ack! are never called from any other thread. Messages are auto-acked immediately after being placed on the output channel (at-most-once delivery).
Stop the consumer by putting any value onto :stop-ch (a promise-chan), or by closing :ch. The reader exits after the current batch; the ConsumerGroup is closed before the thread terminates.
Returns a map: :ch — channel of Msg values :stop-ch — promise-chan; put any value to stop the consumer
Options: :ch — supply your own channel; (a/chan 256) used if not provided :poll-opts — map passed to k7/poll! (default: {:max-batch 64 :timeout-ms 5}) :cg-opts — map passed to k7/consumer-group (default: {})
Open a ConsumerGroup on queue q for group-id and start a dedicated
reader thread that polls and delivers Msg values to a channel.
The ConsumerGroup is owned entirely by the reader thread —
poll! and ack! are never called from any other thread.
Messages are auto-acked immediately after being placed on the
output channel (at-most-once delivery).
Stop the consumer by putting any value onto :stop-ch (a promise-chan),
or by closing :ch. The reader exits after the current batch; the
ConsumerGroup is closed before the thread terminates.
Returns a map:
:ch — channel of Msg values
:stop-ch — promise-chan; put any value to stop the consumer
Options:
:ch — supply your own channel; (a/chan 256) used if not provided
:poll-opts — map passed to k7/poll! (default: {:max-batch 64
:timeout-ms 5})
:cg-opts — map passed to k7/consumer-group (default: {})(producer-chan q & {:keys [ch]})Start a dedicated writer thread that drains ch and calls enqueue! on q.
Returns ch (default: (a/chan 256)), which callers put byte arrays onto. Close ch to stop the writer thread.
Multiple producer-chans on the same queue are safe — each uses locking to serialize enqueue! calls. For single-producer use, prefer sink! which avoids locking entirely.
Options: :ch — supply your own channel; (a/chan 256) used if not provided
Start a dedicated writer thread that drains ch and calls enqueue! on q. Returns ch (default: (a/chan 256)), which callers put byte arrays onto. Close ch to stop the writer thread. Multiple producer-chans on the same queue are safe — each uses locking to serialize enqueue! calls. For single-producer use, prefer sink! which avoids locking entirely. Options: :ch — supply your own channel; (a/chan 256) used if not provided
(sink! q ch)Block the calling thread, taking byte arrays from ch and calling enqueue! on q until ch is closed.
enqueue! is not thread-safe, so only one sink! may run against a given queue at a time. For concurrent production, use multiple upstream producers writing into ch — channel delivery to sink! is serialized automatically. You can wrap sink! in a/thread or future to run in the background, as long as there is only one.
Block the calling thread, taking byte arrays from ch and calling enqueue! on q until ch is closed. enqueue! is not thread-safe, so only one sink! may run against a given queue at a time. For concurrent production, use multiple upstream producers writing into ch — channel delivery to sink! is serialized automatically. You can wrap sink! in a/thread or future to run in the background, as long as there is only one.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |