Liking cljdoc? Tell your friends :D

felice.consumer


assignmentclj

(assignment consumer)

returns a set of topic-partition map currently assigned to this consumer.

returns a set of topic-partition map currently assigned to this consumer.
sourceraw docstring

close!clj

(close! consumer)
(close! consumer timeout)

Tries to close the consumer cleanly within the specified timeout in ms (defaults to 30 secs).

This method waits up to timeout for the consumer to complete pending commits and leave the group.

If auto-commit is enabled, this will commit the current offsets if possible within the timeout.

If the consumer is unable to complete offset commits and gracefully leave the group before the timeout expires, the consumer is force closed.

Tries to close the consumer cleanly within the specified timeout in ms
(defaults to 30 secs).

This method waits up to timeout for the consumer to complete
pending commits and leave the group.

If auto-commit is enabled, this will commit the current
offsets if possible within the timeout.

If the consumer is unable to complete offset commits and gracefully
leave the group before the timeout expires, the consumer is force closed.
sourceraw docstring

commit-message-offsetclj

(commit-message-offset consumer {:keys [partition topic offset] :as record})

Commit a specific record

consumer must be a KafkaConsumer object

record must be a map with :partition :topic and :offset

Commit a specific record

consumer must be a KafkaConsumer object

record must be a map with :partition :topic and :offset
sourceraw docstring

commit-syncclj

(commit-sync consumer)

Commit offsets returned on the last poll() for all the subscribed list of topics and partitions.

consumer must be a KafkaConsumer object

Commit offsets returned on the last `poll()` for all
the subscribed list of topics and partitions.

consumer must be a KafkaConsumer object
sourceraw docstring

CONF-COERCERSclj

source

consumerclj

(consumer conf)
(consumer conf topics)
(consumer conf key-deserializer value-deserializer)
(consumer conf key-deserializer value-deserializer topics)

create a consumer

conf is a map {:keyword value} See: https://kafka.apache.org/documentation/#consumerconfigs for all possibilities

key and value serializer can be one of keys defined in felice.serializer namespace with the 1 argument arity, :key.deserializer and :value.deserializer must be provided in conf

you can optionaly provide a list of topics to subscribe to

create a consumer

conf is a map {:keyword value}
See: https://kafka.apache.org/documentation/#consumerconfigs for all possibilities

key and value serializer can be one of keys defined in `felice.serializer` namespace
with the 1 argument arity, :key.deserializer and :value.deserializer must be provided in conf

you can optionaly provide a list of topics to subscribe to
sourceraw docstring

consumer-record->mapclj

(consumer-record->map record)

transforms a ConsumerRecord to a clojure map containing: :key``:value :offset :topic :partition :timestamp :timestamp-type and :header

transforms a ConsumerRecord to a clojure map containing:
`:key``:value` `:offset` `:topic` `:partition` `:timestamp`
`:timestamp-type` and `:header`
sourceraw docstring

metricsclj

(metrics consumer)

returns a list of mtrics mapkept by the consumer

returns a list of mtrics mapkept by the consumer
sourceraw docstring

pollclj

(poll consumer timeout)

Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.

This method returns immediately if there are records available. Otherwise, it will await the timeout ms.

If the timeout expires, an empty record set will be returned.

Fetch data for the topics or partitions specified using
one of the subscribe/assign APIs.

This method returns immediately if there are records available.
Otherwise, it will await the timeout ms.

If the timeout expires, an empty record set will be returned.
sourceraw docstring

poll->all-recordsclj

(poll->all-records records)

takes the return off a poll (see ConsumerRecords) returns a lazy seq of records as clojure maps

takes the return off a poll (see ConsumerRecords)
returns a lazy seq of records as clojure maps
sourceraw docstring

poll->records-by-topicclj

(poll->records-by-topic records)

takes the return of a poll (see ConsumerRecords) returns a map {topic records-seq}

takes the return of a poll (see ConsumerRecords)
returns a map {topic records-seq}
sourceraw docstring

poll-and-processclj

(poll-and-process consumer timeout process-fn commit-policy)

Poll records and run process-fn on each of them (presumably for side effects)

Poll records and run process-fn on each of them (presumably for side effects)
sourceraw docstring

poll-loopclj

(poll-loop consumer-conf process-record-fn)
(poll-loop consumer-conf process-record-fn opts)

Start a consumer loop, calling a callback for each record, and returning a function to stop the loop.

Parameters

         consumer: consumer config (see consumer)
process-record-fn: function to call with each record polled
          options: {:poll-timeout 2000 ; duration of a polling without events (ms)
                    :on-error-fn  (fn [ex] ...); called on exception
                    :commit-policy :never ; #{:never :poll :record}}

commit policy

  • :never : does nothing (use it if you enabled client auto commit)

  • :poll : commit last read offset after processing all the items of a poll

  • :record : commit the offset of every processed record

    if you want to commit messages yourself, set commit policy to :never and use commit-message-offset or commit-sync

Returns

          stop-fn: callback function to stop the loop
Start a consumer loop, calling a callback for each record, and returning a function
  to stop the loop.

### Parameters
             consumer: consumer config (see consumer)
    process-record-fn: function to call with each record polled
              options: {:poll-timeout 2000 ; duration of a polling without events (ms)
                        :on-error-fn  (fn [ex] ...); called on exception
                        :commit-policy :never ; #{:never :poll :record}}
#### commit policy
* :never  : does nothing (use it if you enabled client auto commit)
* :poll   : commit last read offset after processing all the items of a poll
* :record : commit the offset of every processed record

  if you want to commit messages yourself, set commit policy to `:never`
  and use `commit-message-offset` or `commit-sync`

### Returns
              stop-fn: callback function to stop the loop
sourceraw docstring

poll-loop*clj

(poll-loop* consumer
            process-record-fn
            {:keys [poll-timeout on-error-fn commit-policy close-timeout-ms]
             :or {poll-timeout 2000 close-timeout-ms 5000}})
source

poll-loopsclj

(poll-loops consumer-conf process-record-fn)
(poll-loops consumer-conf process-record-fn {:as opts})
(poll-loops consumer-conf
            process-record-fn
            topics
            {:keys [threads threads-by-topic] :as opts})

Start consumer loops, calling a callback for each record, and returning a function to stop the loops.

Parameters

         consumer: consumer config (see consumer)
process-record-fn: function to call with each record polled
           topics: topics you want to subscribe to
          options: {:poll-timeout 2000 ; duration of a polling without events (ms)
                    :on-error-fn  (fn [ex] ...); called on exception
                    :commit-policy :never ; #{:never :poll :record}
                    :threads-by-topic 1 ; number of spawned consumers for each topic
                    :threads 1 ; number of spawned consumers}

commit policy

  • :never : does nothing (use it if you enabled client auto commit)

  • :poll : commit last read offset after processing all the items of a poll

  • :record : commit the offset of every processed record

    if you want to commit messages yourself, set commit policy to :never and use commit-message-offset or commit-sync

Multi-threading

You can set either :threads-by-topic or :threads option (if both are set, :threads-by-topic will win)

  • :threads : spawn N threads total (each thread listening all registered topic)
  • :threads-by-topic : spawn N threads for each registered topic
  • you can also provide a map {:topic :threads} instead of a list of topics

Returns

          stop-fn: callback function to stop the loop
Start consumer loops, calling a callback for each record, and returning a function
  to stop the loops.

### Parameters
             consumer: consumer config (see consumer)
    process-record-fn: function to call with each record polled
               topics: topics you want to subscribe to
              options: {:poll-timeout 2000 ; duration of a polling without events (ms)
                        :on-error-fn  (fn [ex] ...); called on exception
                        :commit-policy :never ; #{:never :poll :record}
                        :threads-by-topic 1 ; number of spawned consumers for each topic
                        :threads 1 ; number of spawned consumers}
#### commit policy
* :never  : does nothing (use it if you enabled client auto commit)
* :poll   : commit last read offset after processing all the items of a poll
* :record : commit the offset of every processed record

  if you want to commit messages yourself, set commit policy to `:never` and use `commit-message-offset` or `commit-sync`

#### Multi-threading
  You can set either :threads-by-topic or :threads option (if both are set, :threads-by-topic will win)
  * :threads          : spawn N threads total (each thread listening all registered topic)
  * :threads-by-topic : spawn N threads for each registered topic
  * you can also provide a map {:topic :threads} instead of a list of topics

### Returns
              stop-fn: callback function to stop the loop
sourceraw docstring

poll-loops*clj

(poll-loops* consumer-conf process-record-fn topics opts threads)
source

subscribeclj

(subscribe consumer & topics)

subscribe the consumer to one or more topics automaticly resubscribes previous subscriptions returns the consumer

subscribe the consumer to one or more topics
automaticly resubscribes previous subscriptions
returns the consumer
sourceraw docstring

subscriptionclj

(subscription consumer)

returns the set of currenctly subscribed topics

returns the set of currenctly subscribed topics
sourceraw docstring

topic-partition->mapclj

(topic-partition->map topic-partition)

converts a TopicPartition object to a clojure map containing :topic and :partition

converts a TopicPartition object to a clojure map containing :topic and :partition
sourceraw docstring

unsubscribeclj

(unsubscribe consumer)

Unsubscribe from all topics currently subscribed returns the consumer

Unsubscribe from all topics currently subscribed
returns the consumer
sourceraw docstring

wakeupclj

(wakeup consumer)

Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.

The thread which is blocking in an operation will throw WakeupException.

If no thread is blocking in a method which can throw WakeupException, the next call to such a method will raise it instead.

Wakeup the consumer. This method is thread-safe and is
useful in particular to abort a long poll.

The thread which is blocking in an operation will throw WakeupException.

If no thread is blocking in a method which can throw WakeupException,
the next call to such a method will raise it instead.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close