Clojure interface for Kafka Share Consumer API (KIP-932). Enables multiple consumers to read from the same partition, solving the Head of Line Blocking problem. For complete JavaDocs, see: https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html
Clojure interface for Kafka Share Consumer API (KIP-932). Enables multiple consumers to read from the same partition, solving the Head of Line Blocking problem. For complete JavaDocs, see: https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html
(acknowledge consumer {:keys [topic partition offset]})(acknowledge consumer {:keys [topic partition offset]} ack-type)Acknowledges a consumed record. In explicit acknowledgement mode, every record must be acknowledged before the next poll.
Takes a message map as returned by messages (must contain :topic, :partition, :offset).
The acknowledgement type can be: :accept - record consumed successfully (default) :release - not consumed, release for redelivery :reject - not consumed, reject permanently :renew - still processing, renew acquisition lock
Usage:
(let [msgs (messages consumer)] (doseq [msg msgs] (acknowledge consumer msg)))
(acknowledge consumer msg :reject) ;; => nil
Acknowledges a consumed record. In explicit acknowledgement mode, every record
must be acknowledged before the next poll.
Takes a message map as returned by [[messages]] (must contain :topic, :partition, :offset).
The acknowledgement type can be:
:accept - record consumed successfully (default)
:release - not consumed, release for redelivery
:reject - not consumed, reject permanently
:renew - still processing, renew acquisition lock
Usage:
(let [msgs (messages consumer)]
(doseq [msg msgs]
(acknowledge consumer msg)))
(acknowledge consumer msg :reject)
;; => nil
(close consumer)(close consumer timeout-ms)Closes the share consumer.
Closes the share consumer. See: https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html#close()
(commit-async consumer)(commit-async consumer callback-fn)Commits the acknowledgements for the share consumer asynchronously.
Usage:
(commit-async consumer) ;; => nil
(commit-async consumer (fn [offsets exception] (if exception (println "Commit failed:" exception) (println "Committed:" offsets)))) ;; => nil
Commits the acknowledgements for the share consumer asynchronously.
Usage:
(commit-async consumer)
;; => nil
(commit-async consumer (fn [offsets exception]
(if exception
(println "Commit failed:" exception)
(println "Committed:" offsets))))
;; => nil
(commit-sync consumer)(commit-sync consumer timeout-ms)Commits the acknowledgements for the share consumer synchronously. Returns a map of {topic-partition -> exception-or-nil}.
Usage:
(commit-sync consumer) ;; => {{:topic "topic-a", :partition 0} nil}
Commits the acknowledgements for the share consumer synchronously.
Returns a map of {topic-partition -> exception-or-nil}.
Usage:
(commit-sync consumer)
;; => {{:topic "topic-a", :partition 0} nil}
(messages consumer & {:keys [timeout] :or {timeout 1000}})Consumes messages from currently subscribed partitions and returns a sequence of messages. If no messages are available, it will use the provided timeout (or default of 1000ms) to BLOCK for messages to be available, before returning.
Usage:
(messages consumer) ;; => [{:topic "topic-a", :partition 0, :offset 0, :key nil, :value "hello"}]
(messages consumer :timeout 1500) ;; => []
Consumes messages from currently subscribed partitions and returns a sequence of messages.
If no messages are available, it will use the provided timeout (or default of 1000ms)
to BLOCK for messages to be available, before returning.
Usage:
(messages consumer)
;; => [{:topic "topic-a", :partition 0, :offset 0, :key nil, :value "hello"}]
(messages consumer :timeout 1500)
;; => []
(metrics consumer)Returns a sequence of maps representing all the share consumer's internal metrics. Each map contains :group, :name, :description, :tags and :value.
Usage:
(metrics consumer) ;; => [{:group "consumer-share-metrics", :name "fetch-rate", ...}]
Returns a sequence of maps representing all the share consumer's internal metrics.
Each map contains :group, :name, :description, :tags and :value.
Usage:
(metrics consumer)
;; => [{:group "consumer-share-metrics", :name "fetch-rate", ...}]
(share-consumer config)(share-consumer config key-deserializer value-deserializer)Takes a map of config options and returns a KafkaShareConsumer for consuming
records from Kafka using share groups. Multiple consumers in the same group can
read from the same partition concurrently.
NOTE KafkaShareConsumer instances are NOT thread-safe.
For more information and available config options, see: https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html
Usage:
(share-consumer {"bootstrap.servers" "localhost:9092" "group.id" "my-share-group" "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})
(share-consumer {"bootstrap.servers" "localhost:9092" "group.id" "my-share-group"} (string-deserializer) (string-deserializer))
(with-open [c (share-consumer config (string-deserializer) (string-deserializer))] (subscribe c "topic-a") (take 5 (messages c)))
Takes a map of config options and returns a `KafkaShareConsumer` for consuming
records from Kafka using share groups. Multiple consumers in the same group can
read from the same partition concurrently.
NOTE `KafkaShareConsumer` instances are NOT thread-safe.
For more information and available config options,
see: https://kafka.apache.org/42/javadoc/org/apache/kafka/clients/consumer/KafkaShareConsumer.html
Usage:
(share-consumer {"bootstrap.servers" "localhost:9092"
"group.id" "my-share-group"
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})
(share-consumer {"bootstrap.servers" "localhost:9092"
"group.id" "my-share-group"} (string-deserializer) (string-deserializer))
(with-open [c (share-consumer config (string-deserializer) (string-deserializer))]
(subscribe c "topic-a")
(take 5 (messages c)))
(subscribe consumer topics)Subscribes the share consumer to the given topic(s). Takes a single topic name or a sequence of topic names.
NOTE Share consumers only support topic-name subscriptions. Pattern and manual partition assignment are not supported.
Usage:
(subscribe consumer "topic-a") ;; => nil
(subscribe consumer ["topic-a" "topic-b"]) ;; => nil
Subscribes the share consumer to the given topic(s). Takes a single topic name or a sequence of topic names. NOTE Share consumers only support topic-name subscriptions. Pattern and manual partition assignment are not supported. Usage: (subscribe consumer "topic-a") ;; => nil (subscribe consumer ["topic-a" "topic-b"]) ;; => nil
(subscriptions consumer)Returns the set of topics the share consumer is subscribed to.
Usage:
(subscriptions consumer) ;; => #{"topic-a" "topic-b"}
Returns the set of topics the share consumer is subscribed to.
Usage:
(subscriptions consumer)
;; => #{"topic-a" "topic-b"}
(unsubscribe consumer)Unsubscribes the share consumer from all subscribed topics.
Unsubscribes the share consumer from all subscribed topics.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |