(assignment consumer)
returns a set of topic-partition map currently assigned to this consumer.
returns a set of topic-partition map currently assigned to this consumer.
(close! consumer)
(close! consumer timeout)
Tries to close the consumer cleanly within the specified timeout in ms (defaults to 30 secs).
This method waits up to timeout for the consumer to complete pending commits and leave the group.
If auto-commit is enabled, this will commit the current offsets if possible within the timeout.
If the consumer is unable to complete offset commits and gracefully leave the group before the timeout expires, the consumer is force closed.
Tries to close the consumer cleanly within the specified timeout in ms (defaults to 30 secs). This method waits up to timeout for the consumer to complete pending commits and leave the group. If auto-commit is enabled, this will commit the current offsets if possible within the timeout. If the consumer is unable to complete offset commits and gracefully leave the group before the timeout expires, the consumer is force closed.
(commit-message-offset consumer {:keys [partition topic offset] :as record})
Commit a specific record
consumer must be a KafkaConsumer object
record must be a map with :partition :topic and :offset
Commit a specific record consumer must be a KafkaConsumer object record must be a map with :partition :topic and :offset
(commit-sync consumer)
Commit offsets returned on the last poll()
for all
the subscribed list of topics and partitions.
consumer must be a KafkaConsumer object
Commit offsets returned on the last `poll()` for all the subscribed list of topics and partitions. consumer must be a KafkaConsumer object
(consumer conf)
(consumer conf topics)
(consumer conf key-deserializer value-deserializer)
(consumer conf key-deserializer value-deserializer topics)
create a consumer
conf is a map {:keyword value} See: https://kafka.apache.org/documentation/#consumerconfigs for all possibilities
key and value serializer can be one of keys defined in felice.serializer
namespace
with the 1 argument arity, :key.deserializer and :value.deserializer must be provided in conf
you can optionaly provide a list of topics to subscribe to
create a consumer conf is a map {:keyword value} See: https://kafka.apache.org/documentation/#consumerconfigs for all possibilities key and value serializer can be one of keys defined in `felice.serializer` namespace with the 1 argument arity, :key.deserializer and :value.deserializer must be provided in conf you can optionaly provide a list of topics to subscribe to
(consumer-record->map record)
transforms a ConsumerRecord to a clojure map containing:
:key``:value
:offset
:topic
:partition
:timestamp
:timestamp-type
and :header
transforms a ConsumerRecord to a clojure map containing: `:key``:value` `:offset` `:topic` `:partition` `:timestamp` `:timestamp-type` and `:header`
(metrics consumer)
returns a list of mtrics mapkept by the consumer
returns a list of mtrics mapkept by the consumer
(poll consumer timeout)
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.
This method returns immediately if there are records available. Otherwise, it will await the timeout ms.
If the timeout expires, an empty record set will be returned.
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. This method returns immediately if there are records available. Otherwise, it will await the timeout ms. If the timeout expires, an empty record set will be returned.
(poll->all-records records)
takes the return off a poll (see ConsumerRecords) returns a lazy seq of records as clojure maps
takes the return off a poll (see ConsumerRecords) returns a lazy seq of records as clojure maps
(poll->records-by-topic records)
takes the return of a poll (see ConsumerRecords) returns a map {topic records-seq}
takes the return of a poll (see ConsumerRecords) returns a map {topic records-seq}
(poll-and-process consumer timeout process-fn commit-policy)
Poll records and run process-fn on each of them (presumably for side effects)
Poll records and run process-fn on each of them (presumably for side effects)
(poll-loop consumer-conf process-record-fn)
(poll-loop consumer-conf process-record-fn opts)
Start a consumer loop, calling a callback for each record, and returning a function to stop the loop.
consumer: consumer config (see consumer)
process-record-fn: function to call with each record polled
options: {:poll-timeout 2000 ; duration of a polling without events (ms)
:on-error-fn (fn [ex] ...); called on exception
:commit-policy :never ; #{:never :poll :record}}
:never : does nothing (use it if you enabled client auto commit)
:poll : commit last read offset after processing all the items of a poll
:record : commit the offset of every processed record
if you want to commit messages yourself, set commit policy to :never
and use commit-message-offset
or commit-sync
stop-fn: callback function to stop the loop
Start a consumer loop, calling a callback for each record, and returning a function to stop the loop. ### Parameters consumer: consumer config (see consumer) process-record-fn: function to call with each record polled options: {:poll-timeout 2000 ; duration of a polling without events (ms) :on-error-fn (fn [ex] ...); called on exception :commit-policy :never ; #{:never :poll :record}} #### commit policy * :never : does nothing (use it if you enabled client auto commit) * :poll : commit last read offset after processing all the items of a poll * :record : commit the offset of every processed record if you want to commit messages yourself, set commit policy to `:never` and use `commit-message-offset` or `commit-sync` ### Returns stop-fn: callback function to stop the loop
(poll-loop* consumer
process-record-fn
{:keys [poll-timeout on-error-fn commit-policy close-timeout-ms]
:or {poll-timeout 2000 close-timeout-ms 5000}})
(poll-loops consumer-conf process-record-fn)
(poll-loops consumer-conf process-record-fn {:as opts})
(poll-loops consumer-conf
process-record-fn
topics
{:keys [threads threads-by-topic] :as opts})
Start consumer loops, calling a callback for each record, and returning a function to stop the loops.
consumer: consumer config (see consumer)
process-record-fn: function to call with each record polled
topics: topics you want to subscribe to
options: {:poll-timeout 2000 ; duration of a polling without events (ms)
:on-error-fn (fn [ex] ...); called on exception
:commit-policy :never ; #{:never :poll :record}
:threads-by-topic 1 ; number of spawned consumers for each topic
:threads 1 ; number of spawned consumers}
:never : does nothing (use it if you enabled client auto commit)
:poll : commit last read offset after processing all the items of a poll
:record : commit the offset of every processed record
if you want to commit messages yourself, set commit policy to :never
and use commit-message-offset
or commit-sync
You can set either :threads-by-topic or :threads option (if both are set, :threads-by-topic will win)
stop-fn: callback function to stop the loop
Start consumer loops, calling a callback for each record, and returning a function to stop the loops. ### Parameters consumer: consumer config (see consumer) process-record-fn: function to call with each record polled topics: topics you want to subscribe to options: {:poll-timeout 2000 ; duration of a polling without events (ms) :on-error-fn (fn [ex] ...); called on exception :commit-policy :never ; #{:never :poll :record} :threads-by-topic 1 ; number of spawned consumers for each topic :threads 1 ; number of spawned consumers} #### commit policy * :never : does nothing (use it if you enabled client auto commit) * :poll : commit last read offset after processing all the items of a poll * :record : commit the offset of every processed record if you want to commit messages yourself, set commit policy to `:never` and use `commit-message-offset` or `commit-sync` #### Multi-threading You can set either :threads-by-topic or :threads option (if both are set, :threads-by-topic will win) * :threads : spawn N threads total (each thread listening all registered topic) * :threads-by-topic : spawn N threads for each registered topic * you can also provide a map {:topic :threads} instead of a list of topics ### Returns stop-fn: callback function to stop the loop
(subscribe consumer & topics)
subscribe the consumer to one or more topics automaticly resubscribes previous subscriptions returns the consumer
subscribe the consumer to one or more topics automaticly resubscribes previous subscriptions returns the consumer
(subscription consumer)
returns the set of currenctly subscribed topics
returns the set of currenctly subscribed topics
(topic-partition->map topic-partition)
converts a TopicPartition object to a clojure map containing :topic and :partition
converts a TopicPartition object to a clojure map containing :topic and :partition
(unsubscribe consumer)
Unsubscribe from all topics currently subscribed returns the consumer
Unsubscribe from all topics currently subscribed returns the consumer
(wakeup consumer)
Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
The thread which is blocking in an operation will throw WakeupException.
If no thread is blocking in a method which can throw WakeupException, the next call to such a method will raise it instead.
Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll. The thread which is blocking in an operation will throw WakeupException. If no thread is blocking in a method which can throw WakeupException, the next call to such a method will raise it instead.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close