(accumulate-messages consumer topic options)
Like accumulate-subscribed-messages
, but subscribes to the topic
first.
Like `accumulate-subscribed-messages`, but subscribes to the `topic` first.
(accumulate-subscribed-messages
consumer
{:keys [timeout at-least-n format-fn filter-fn]
:or {at-least-n 1 format-fn identity filter-fn identity}})
Helper to consume messages off any existing subscriptions. Will return as
soon as at-least-n
messages have been fetched. May return more, since an
individual poll can fetch more. If timeout
(in ms) expires first, returns
any messages fetched so far. Mesages may be reformatted with format-fn
then
filtered with filter-fn
. Messages excluded by filter-fn
do not count
towards at-least-n
.
Helper to consume messages off any existing subscriptions. Will return as soon as `at-least-n` messages have been fetched. May return more, since an individual poll can fetch more. If `timeout` (in ms) expires first, returns any messages fetched so far. Mesages may be reformatted with `format-fn` then filtered with `filter-fn`. Messages excluded by `filter-fn` do not count towards `at-least-n`.
(assert-producer-not-closed producer-state)
Checks conn-open? in producer state
Checks conn-open? in producer state
(assign-partitions broker-state
consumers
participants
participants-ch
complete-ch)
(get-messages consumer timeout)
(get-messages consumer topic timeout)
DEPRECATED: use txfm-messages
instead.
Poll until consumer
receives some messages on topic
. If timeout
(in ms)
expires first, return an empty vector.
DEPRECATED: use `txfm-messages` instead. Poll until `consumer` receives some messages on `topic`. If `timeout` (in ms) expires first, return an empty vector.
(all-topics this)
(apply-pending-topics this topics)
(clean-up-subscriptions this)
(rebalance-participants broker-state consumers participants-ch complete-ch)
Try to get all the consumers to participate in the rebalance, but if they don't all check in, continue without some of them.
Try to get all the consumers to participate in the rebalance, but if they don't all check in, continue without some of them.
(txfm-messages consumer topic xf)
(txfm-messages consumer topic xf options)
Like txfm-subscribed-messages
, but subscribes to the topic
first.
Like `txfm-subscribed-messages`, but subscribes to the `topic` first.
(txfm-subscribed-messages consumer xf)
(txfm-subscribed-messages consumer
xf
{:keys [timeout ixf rf]
:or {timeout 1000 ixf cat rf conj}})
Helper to txfm messages sent to the consumer
on any existing subscriptions.
Messages will be transformed by the transducing function xf
. Will return as
soon as xf
indicates it has enough records, or the timeout
(in ms, by
default 1000) expires. Thus, to avoid waiting for the timeout
, include e.g.
(take 3)
in xf
. If the timeout
expires, will return any messages
transformed so far.
For example,
(kafka-mock/send producer "topic" "key" "5")
(kafka-mock/send producer "topic" "key" "6")
(kafka-mock/send producer "topic" "key" "7")
(kafka-mock/send producer "topic" "key" "8")
(.subscribe consumer ["topic"])
(kafka-mock/txfm-subscribed-messages consumer (comp (map :value)
(map #(Integer/parseInt %))
(filter even?)
(take 1)))
;; => [6]
By default messages retrieved from (.poll consumer)
will be collected as by
clojure.core/cat
and fed one-by-one through xf
. This behvaior can be
altered by providing a different ixf
. For example, clojure.core/conj
will
feed each batch through in its entirety. This can be combined with
max.poll.records
to control batch size.
Also by default, the transformed messages will be accumulated as by
clojure.core/conj
, and returned. A different reducing function rf
can be
supplied. For example, if xf
produces a stream of numbers, the reducing
function clojure.core/+
will return their sum.
Helper to txfm messages sent to the `consumer` on any existing subscriptions. Messages will be transformed by the transducing function `xf`. Will return as soon as `xf` indicates it has enough records, or the `timeout` (in ms, by default 1000) expires. Thus, to avoid waiting for the `timeout`, include e.g. `(take 3)` in `xf`. If the `timeout` expires, will return any messages transformed so far. For example, ``` clojure (kafka-mock/send producer "topic" "key" "5") (kafka-mock/send producer "topic" "key" "6") (kafka-mock/send producer "topic" "key" "7") (kafka-mock/send producer "topic" "key" "8") (.subscribe consumer ["topic"]) (kafka-mock/txfm-subscribed-messages consumer (comp (map :value) (map #(Integer/parseInt %)) (filter even?) (take 1))) ;; => [6] ``` By default messages retrieved from `(.poll consumer)` will be collected as by `clojure.core/cat` and fed one-by-one through `xf`. This behvaior can be altered by providing a different `ixf`. For example, `clojure.core/conj` will feed each batch through in its entirety. This can be combined with `max.poll.records` to control batch size. Also by default, the transformed messages will be accumulated as by `clojure.core/conj`, and returned. A different reducing function `rf` can be supplied. For example, if `xf` produces a stream of numbers, the reducing function `clojure.core/+` will return their sum.
(with-test-producer-consumer producer-name consumer-name & body)
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close