Liking cljdoc? Tell your friends :D

clj-kafka-x.consumers.shared

Clojure interface for Kafka Share Consumer API (KIP-932). Enables multiple consumers to read from the same partition, solving the Head of Line Blocking problem. For complete JavaDocs, see: https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html

Clojure interface for Kafka Share Consumer API (KIP-932).
Enables multiple consumers to read from the same partition,
solving the Head of Line Blocking problem.
For complete JavaDocs, see:
https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html
raw docstring

acknowledgeclj

(acknowledge consumer {:keys [topic partition offset]})
(acknowledge consumer {:keys [topic partition offset]} ack-type)

Acknowledges a consumed record. In explicit acknowledgement mode, every record must be acknowledged before the next poll.

Takes a message map as returned by messages (must contain :topic, :partition, :offset).

The acknowledgement type can be: :accept - record consumed successfully (default) :release - not consumed, release for redelivery :reject - not consumed, reject permanently :renew - still processing, renew acquisition lock

Usage:

(let [msgs (messages consumer)] (doseq [msg msgs] (acknowledge consumer msg)))

(acknowledge consumer msg :reject) ;; => nil

Acknowledges a consumed record. In explicit acknowledgement mode, every record
must be acknowledged before the next poll.

Takes a message map as returned by [[messages]] (must contain :topic, :partition, :offset).

The acknowledgement type can be:
  :accept  - record consumed successfully (default)
  :release - not consumed, release for redelivery
  :reject  - not consumed, reject permanently
  :renew   - still processing, renew acquisition lock

Usage:

(let [msgs (messages consumer)]
  (doseq [msg msgs]
    (acknowledge consumer msg)))

(acknowledge consumer msg :reject)
;; => nil
sourceraw docstring

byte-array-deserializerclj

(byte-array-deserializer)
source

closeclj

(close consumer)
(close consumer timeout-ms)
Closes the share consumer.

See: https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html#close()
sourceraw docstring

commit-asyncclj

(commit-async consumer)
(commit-async consumer callback-fn)

Commits the acknowledgements for the share consumer asynchronously.

Usage:

(commit-async consumer) ;; => nil

(commit-async consumer (fn [offsets exception] (if exception (println "Commit failed:" exception) (println "Committed:" offsets)))) ;; => nil

Commits the acknowledgements for the share consumer asynchronously.

Usage:

(commit-async consumer)
;; => nil

(commit-async consumer (fn [offsets exception]
                         (if exception
                           (println "Commit failed:" exception)
                           (println "Committed:" offsets))))
;; => nil
sourceraw docstring

commit-syncclj

(commit-sync consumer)
(commit-sync consumer timeout-ms)

Commits the acknowledgements for the share consumer synchronously. Returns a map of {topic-partition -> exception-or-nil}.

Usage:

(commit-sync consumer) ;; => {{:topic "topic-a", :partition 0} nil}

Commits the acknowledgements for the share consumer synchronously.
Returns a map of {topic-partition -> exception-or-nil}.

Usage:

(commit-sync consumer)
;; => {{:topic "topic-a", :partition 0} nil}
sourceraw docstring

messagesclj

(messages consumer & {:keys [timeout] :or {timeout 1000}})

Consumes messages from currently subscribed partitions and returns a sequence of messages. If no messages are available, it will use the provided timeout (or default of 1000ms) to BLOCK for messages to be available, before returning.

Usage:

(messages consumer) ;; => [{:topic "topic-a", :partition 0, :offset 0, :key nil, :value "hello"}]

(messages consumer :timeout 1500) ;; => []

Consumes messages from currently subscribed partitions and returns a sequence of messages.
If no messages are available, it will use the provided timeout (or default of 1000ms)
to BLOCK for messages to be available, before returning.

Usage:

(messages consumer)
;; => [{:topic "topic-a", :partition 0, :offset 0, :key nil, :value "hello"}]

(messages consumer :timeout 1500)
;; => []
sourceraw docstring

metricsclj

(metrics consumer)

Returns a sequence of maps representing all the share consumer's internal metrics. Each map contains :group, :name, :description, :tags and :value.

Usage:

(metrics consumer) ;; => [{:group "consumer-share-metrics", :name "fetch-rate", ...}]

Returns a sequence of maps representing all the share consumer's internal metrics.
 Each map contains :group, :name, :description, :tags and :value.

Usage:

(metrics consumer)
;; => [{:group "consumer-share-metrics", :name "fetch-rate", ...}]
sourceraw docstring

share-consumerclj

(share-consumer config)
(share-consumer config key-deserializer value-deserializer)

Takes a map of config options and returns a KafkaShareConsumer for consuming records from Kafka using share groups. Multiple consumers in the same group can read from the same partition concurrently.

NOTE KafkaShareConsumer instances are NOT thread-safe.

For more information and available config options, see: https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html

Usage:

(share-consumer {"bootstrap.servers" "localhost:9092" "group.id" "my-share-group" "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})

(share-consumer {"bootstrap.servers" "localhost:9092" "group.id" "my-share-group"} (string-deserializer) (string-deserializer))

(with-open [c (share-consumer config (string-deserializer) (string-deserializer))] (subscribe c "topic-a") (take 5 (messages c)))

Takes a map of config options and returns a `KafkaShareConsumer` for consuming
records from Kafka using share groups. Multiple consumers in the same group can
read from the same partition concurrently.

NOTE `KafkaShareConsumer` instances are NOT thread-safe.

For more information and available config options,
see: https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html

Usage:

(share-consumer {"bootstrap.servers" "localhost:9092"
                 "group.id" "my-share-group"
                 "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
                 "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})

(share-consumer {"bootstrap.servers" "localhost:9092"
                 "group.id" "my-share-group"} (string-deserializer) (string-deserializer))

(with-open [c (share-consumer config (string-deserializer) (string-deserializer))]
  (subscribe c "topic-a")
  (take 5 (messages c)))
sourceraw docstring

string-deserializerclj

(string-deserializer)
source

subscribeclj

(subscribe consumer topics)

Subscribes the share consumer to the given topic(s). Takes a single topic name or a sequence of topic names.

NOTE Share consumers only support topic-name subscriptions. Pattern and manual partition assignment are not supported.

Usage:

(subscribe consumer "topic-a") ;; => nil

(subscribe consumer ["topic-a" "topic-b"]) ;; => nil

Subscribes the share consumer to the given topic(s).
Takes a single topic name or a sequence of topic names.

NOTE Share consumers only support topic-name subscriptions.
Pattern and manual partition assignment are not supported.

Usage:

(subscribe consumer "topic-a")
;; => nil

(subscribe consumer ["topic-a" "topic-b"])
;; => nil
sourceraw docstring

subscriptionsclj

(subscriptions consumer)

Returns the set of topics the share consumer is subscribed to.

Usage:

(subscriptions consumer) ;; => #{"topic-a" "topic-b"}

Returns the set of topics the share consumer is subscribed to.

Usage:

(subscriptions consumer)
;; => #{"topic-a" "topic-b"}
sourceraw docstring

unsubscribeclj

(unsubscribe consumer)

Unsubscribes the share consumer from all subscribed topics.

Unsubscribes the share consumer from all subscribed topics.
sourceraw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close