Liking cljdoc? Tell your friends :D

s-exp.k7.async

core.async sugar for k7 queues

core.async sugar for k7 queues
raw docstring

consumer-group-chanclj

(consumer-group-chan q group-id & {:keys [ch poll-opts cg-opts]})

Open a ConsumerGroup on queue q for group-id and start a dedicated reader thread that polls and delivers Msg values to a channel.

The ConsumerGroup is owned entirely by the reader thread — poll! and ack! are never called from any other thread. Messages are auto-acked immediately after being placed on the output channel (at-most-once delivery).

Stop the consumer by putting any value onto :stop-ch (a promise-chan), or by closing :ch. The reader exits after the current batch; the ConsumerGroup is closed before the thread terminates.

Returns a map: :ch — channel of Msg values :stop-ch — promise-chan; put any value to stop the consumer

Options: :ch — supply your own channel; (a/chan 256) used if not provided :poll-opts — map passed to k7/poll! (default: {:max-batch 64 :timeout-ms 5}) :cg-opts — map passed to k7/consumer-group (default: {})

Open a ConsumerGroup on queue q for group-id and start a dedicated
reader thread that polls and delivers Msg values to a channel.

The ConsumerGroup is owned entirely by the reader thread —
poll! and ack! are never called from any other thread.
Messages are auto-acked immediately after being placed on the
output channel (at-most-once delivery).

Stop the consumer by putting any value onto :stop-ch (a promise-chan),
or by closing :ch. The reader exits after the current batch; the
ConsumerGroup is closed before the thread terminates.

Returns a map:
  :ch      — channel of Msg values
  :stop-ch — promise-chan; put any value to stop the consumer

Options:
  :ch          — supply your own channel; (a/chan 256) used if not provided
  :poll-opts   — map passed to k7/poll! (default: {:max-batch 64
                                                    :timeout-ms 5})
  :cg-opts     — map passed to k7/consumer-group (default: {})
raw docstring

producer-chanclj

(producer-chan q & {:keys [ch]})

Start a dedicated writer thread that drains ch and calls enqueue! on q.

Returns ch (default: (a/chan 256)), which callers put byte arrays onto. Close ch to stop the writer thread.

Multiple producer-chans on the same queue are safe — each uses locking to serialize enqueue! calls. For single-producer use, prefer sink! which avoids locking entirely.

Options: :ch — supply your own channel; (a/chan 256) used if not provided

Start a dedicated writer thread that drains ch and calls enqueue! on q.

Returns ch (default: (a/chan 256)), which callers put byte arrays onto.
Close ch to stop the writer thread.

Multiple producer-chans on the same queue are safe — each uses locking
to serialize enqueue! calls. For single-producer use, prefer sink! which
avoids locking entirely.

Options:
  :ch — supply your own channel; (a/chan 256) used if not provided
raw docstring

sink!clj

(sink! q ch)

Block the calling thread, taking byte arrays from ch and calling enqueue! on q until ch is closed.

enqueue! is not thread-safe, so only one sink! may run against a given queue at a time. For concurrent production, use multiple upstream producers writing into ch — channel delivery to sink! is serialized automatically. You can wrap sink! in a/thread or future to run in the background, as long as there is only one.

Block the calling thread, taking byte arrays from ch and calling
enqueue! on q until ch is closed.

enqueue! is not thread-safe, so only one sink! may run against a
given queue at a time. For concurrent production, use multiple
upstream producers writing into ch — channel delivery to sink! is
serialized automatically. You can wrap sink! in a/thread or future to run
in the background, as long as there is only one.
raw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close