Liking cljdoc? Tell your friends :D

clj-kafka-x.consumers.simple

Clojure interface for Kafka Consumer API. For complete JavaDocs, see: http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/package-summary.html

Clojure interface for Kafka Consumer API.
For complete JavaDocs, see:
http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/package-summary.html
raw docstring

byte-array-deserializerclj

(byte-array-deserializer)
source

commit-asyncclj

(commit-async consumer)
(commit-async consumer offset-commit-fn)
(commit-async consumer topic-partition-offsets-metadata offset-commit-fn)

Commits the offsets of messages returned by the last call to the messages function or the given offsets.

NOTE This is done aysnchronously and will return immediately. (Based on the code in kafka-clients 0.9.0.0 the commit request is not actually made until the next time the messages function is called)

Usage:

; Commits all the offsets received from the last call to the messages function. ; Exceptions/Errors are ignored (commit-async consumer) ;; => nil

; Commits all the offsets received from the last call to the messages function. ; Success or failure is handled by the given callback function (commit-async consumer (fn [offsets exception] (if exception (println "Commits failed for " offsets " Exception->" exception) (println "Commits passed for " offsets)))) ;; => nil

; Commits the specified offsets to the specific topic-partitions. ; Success or failure is handled by the given callback function (def tp-om {{:topic "topic-a", :partition 4} {:offset 24, :metadata "important commit"}, {:topic "topic-a", :partition 1} {:offset 234, :metadata "commited by thread A"}, {:topic "topic-b", :partition 7} {:offset 23, :metadata "commited on 12/12/12"}})

(commit-async consumer tp-om (fn [offsets exception] (if exception (println "Commits failed for " offsets " Exception->" exception) (println "Commits passed for " offsets)))) ;; => nil

Commits the offsets of messages returned by the last call to the messages function or the given offsets.

NOTE This is done aysnchronously and will return immediately.
(Based on the code in kafka-clients 0.9.0.0 the commit request is not
 actually made until the next time the messages function is called)

Usage:

; Commits all the offsets received from the last call to the messages function.
; Exceptions/Errors are ignored
(commit-async consumer)
;; => nil


; Commits all the offsets received from the last call to the messages function.
; Success or failure is handled by the given callback function
(commit-async consumer (fn [offsets exception]
                        (if exception
                           (println "Commits failed for " offsets " Exception->" exception)
                           (println "Commits passed for " offsets))))
;; => nil


; Commits the specified offsets to the specific topic-partitions.
; Success or failure is handled by the given callback function
(def tp-om   {{:topic "topic-a", :partition 4} {:offset 24, :metadata "important commit"},
              {:topic "topic-a", :partition 1} {:offset 234, :metadata "commited by thread A"},
              {:topic "topic-b", :partition 7} {:offset 23, :metadata "commited on 12/12/12"}})

(commit-async consumer tp-om (fn [offsets exception]
                              (if exception
                                 (println "Commits failed for " offsets " Exception->" exception)
                                 (println "Commits passed for " offsets))))
;; => nil
sourceraw docstring

commit-syncclj

(commit-sync consumer)
(commit-sync consumer topic-partitions-offsets-metadata)

Commits the offsets of messages returned by the last call to the messages function or the given offsets. NOTE This is a blocking I/O operation and will throw an Exception on failure

Usage:

; Commits all the offsets received from the last call to the messages function. ; If there's any failure, an Exception is thrown. (commit-sync consumer) ;; => nil

; Commits the specified offsets to the specific topic-partitions. ; If there's any failure, an Exception is thrown. (def tp-om {{:topic "topic-a", :partition 4} {:offset 24, :metadata "important commit"}, {:topic "topic-a", :partition 1} {:offset 234, :metadata "commited by thread A"}, {:topic "topic-b", :partition 7} {:offset 23, :metadata "commited on 12/12/12"}})

(commit-sync consumer tp-om) ;; => nil

Commits the offsets of messages returned by the last call to the messages function or the given offsets.
NOTE This is a blocking I/O operation and will throw an Exception on failure

Usage:

; Commits all the offsets received from the last call to the messages function.
; If there's any failure, an Exception is thrown.
(commit-sync consumer)
;; => nil

; Commits the specified offsets to the specific topic-partitions.
; If there's any failure, an Exception is thrown.
(def tp-om   {{:topic "topic-a", :partition 4} {:offset 24, :metadata "important commit"},
              {:topic "topic-a", :partition 1} {:offset 234, :metadata "commited by thread A"},
              {:topic "topic-b", :partition 7} {:offset 23, :metadata "commited on 12/12/12"}})

(commit-sync consumer tp-om)
;; => nil
sourceraw docstring

consumerclj

(consumer config)
(consumer config key-deserializer value-deserializer)

Takes a map of config options and returns a KafkaConsumer for consuming records from Kafka.

NOTE KafkaConsumer instances are NOT thread-safe, see https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded

For more information and available conifg options, see: http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html http://kafka.apache.org/documentation.html#newconsumerconfigs

Usage:

;; Created using just a map of configs, in this case the keys ;; bootstrap.servers value.deserializer and key.deserializer are required (consumer {"bootstrap.servers" "localhost:9092" "group.id" "test-group-id" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})

;; Created using a map of configs and the deserializers for keys and values. (consumer {"bootstrap.servers" "localhost:9092" "group.id" "test-group-id"} (string-deserializer) (string-deserializer))

;; KafkaConsumer should be closed when not used anymore, as it's closeable, ;; it can be used in the with-open macro (def config {"bootstrap.servers" "localhost:9092" "group.id" "test-group-id"}) (with-open [c (consumer config (string-deserializer) (string-deserializer))] (subscribe c "topic-a") (take 5 (messages c)))

Takes a map of config options and returns a `KafkaConsumer` for consuming records from Kafka.

  NOTE `KafkaConsumer` instances are NOT thread-safe, see
  https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded

  For more information and available conifg options,
  see: http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
       http://kafka.apache.org/documentation.html#newconsumerconfigs

  Usage:

;; Created using just a map of configs, in this case the keys
;; bootstrap.servers value.deserializer and key.deserializer are required
 (consumer {"bootstrap.servers" "localhost:9092"
            "group.id" "test-group-id"
            "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
            "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})

;; Created using a map of configs and the deserializers for keys and values.
 (consumer {"bootstrap.servers" "localhost:9092"
            "group.id" "test-group-id"} (string-deserializer) (string-deserializer))

;; KafkaConsumer should be closed when not used anymore, as it's closeable,
;; it can be used in the with-open macro
  (def config {"bootstrap.servers" "localhost:9092"
               "group.id" "test-group-id"})
  (with-open [c (consumer config (string-deserializer) (string-deserializer))]
    (subscribe c "topic-a")
    (take 5 (messages c)))

  
sourceraw docstring

last-committed-offsetclj

(last-committed-offset consumer tp)

Gets the last committed offset for the partition of a topic. NOTE This function is a blocking I/O operation.

see http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#committed(org.apache.kafka.common.TopicPartition)

Usage:

(last-committed-offset consumer {:topic "topic-a" :partition 2}) ;; => {:offset 10, :metadata "Metadata set during commit"}

Gets the last committed offset for the partition of a topic.
 NOTE This function is a blocking I/O operation.

 see http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#committed(org.apache.kafka.common.TopicPartition)

Usage:

(last-committed-offset consumer {:topic "topic-a" :partition 2})
;; => {:offset 10, :metadata "Metadata set during commit"}
sourceraw docstring

list-all-partitionsclj

(list-all-partitions consumer topic)

Get metadata about all partitions for a particular topic. NOTE This function is a blocking I/O operation.

See http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor(java.lang.String)

Usage :

(list-all-partitions consumer) ;; => [{:topic "topic-b", ;; :partition 2, ;; :leader {:id 1, :host "172.17.0.4", :port 9092}, ;; :replicas [{:id 1, :host "172.17.0.4", :port 9092}], ;; :in-sync-replicas [{:id 1, :host "172.17.0.4", :port 9092}]} ;; {:topic "topic-b", ;; :partition 1, ;; :leader {:id 3, :host "172.17.0.5", :port 9094}, ;; :replicas [{:id 3, :host "172.17.0.5", :port 9094}], ;; :in-sync-replicas [{:id 3, :host "172.17.0.5", :port 9094}]} ;; {:topic "topic-b", ;; :partition 0, ;; :leader {:id 2, :host "172.17.0.3", :port 9093}, ;; :replicas [{:id 2, :host "172.17.0.3", :port 9093}], ;; :in-sync-replicas [{:id 2, :host "172.17.0.3", :port 9093}]}]

Get metadata about all partitions for a particular topic.
 NOTE This function is a blocking I/O operation.

 See http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#partitionsFor(java.lang.String)

Usage :

(list-all-partitions consumer)
;; => [{:topic "topic-b",
;;      :partition 2,
;;      :leader {:id 1, :host "172.17.0.4", :port 9092},
;;      :replicas [{:id 1, :host "172.17.0.4", :port 9092}],
;;      :in-sync-replicas [{:id 1, :host "172.17.0.4", :port 9092}]}
;;     {:topic "topic-b",
;;      :partition 1,
;;      :leader {:id 3, :host "172.17.0.5", :port 9094},
;;      :replicas [{:id 3, :host "172.17.0.5", :port 9094}],
;;      :in-sync-replicas [{:id 3, :host "172.17.0.5", :port 9094}]}
;;     {:topic "topic-b",
;;      :partition 0,
;;      :leader {:id 2, :host "172.17.0.3", :port 9093},
;;      :replicas [{:id 2, :host "172.17.0.3", :port 9093}],
;;      :in-sync-replicas [{:id 2, :host "172.17.0.3", :port 9093}]}]
sourceraw docstring

list-all-topicsclj

(list-all-topics consumer)

Get metadata about ALL partitions for ALL topics that the user is authorized to view. NOTE This function is a blocking I/O operation.

See http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#listTopics()

Usage :

(list-all-topics consumer) ;; =>{"topic-a" ;; [{:topic "topic-a", ;; :partition 0, ;; :leader {:id 3, :host "172.17.0.5", :port 9094}, ;; :replicas [{:id 3, :host "172.17.0.5", :port 9094}], ;; :in-sync-replicas [{:id 3, :host "172.17.0.5", :port 9094}]}], ;; "topic-b" ;; [{:topic "topic-b", ;; :partition 2, ;; :leader {:id 1, :host "172.17.0.4", :port 9092}, ;; :replicas [{:id 1, :host "172.17.0.4", :port 9092}], ;; :in-sync-replicas [{:id 1, :host "172.17.0.4", :port 9092}]} ;; {:topic "topic-b", ;; :partition 1, ;; :leader {:id 3, :host "172.17.0.5", :port 9094}, ;; :replicas [{:id 3, :host "172.17.0.5", :port 9094}], ;; :in-sync-replicas [{:id 3, :host "172.17.0.5", :port 9094}]} ;; {:topic "topic-b", ;; :partition 0, ;; :leader {:id 2, :host "172.17.0.3", :port 9093}, ;; :replicas [{:id 2, :host "172.17.0.3", :port 9093}], ;; :in-sync-replicas [{:id 2, :host "172.17.0.3", :port 9093}]}]}

Get metadata about ALL partitions for ALL topics that the user is authorized to view.
 NOTE This function is a blocking I/O operation.

 See http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#listTopics()

Usage :

(list-all-topics consumer)
;; =>{"topic-a"
;;    [{:topic "topic-a",
;;      :partition 0,
;;      :leader {:id 3, :host "172.17.0.5", :port 9094},
;;      :replicas [{:id 3, :host "172.17.0.5", :port 9094}],
;;      :in-sync-replicas [{:id 3, :host "172.17.0.5", :port 9094}]}],
;;    "topic-b"
;;    [{:topic "topic-b",
;;      :partition 2,
;;      :leader {:id 1, :host "172.17.0.4", :port 9092},
;;      :replicas [{:id 1, :host "172.17.0.4", :port 9092}],
;;      :in-sync-replicas [{:id 1, :host "172.17.0.4", :port 9092}]}
;;      {:topic "topic-b",
;;      :partition 1,
;;      :leader {:id 3, :host "172.17.0.5", :port 9094},
;;      :replicas [{:id 3, :host "172.17.0.5", :port 9094}],
;;      :in-sync-replicas [{:id 3, :host "172.17.0.5", :port 9094}]}
;;      {:topic "topic-b",
;;      :partition 0,
;;      :leader {:id 2, :host "172.17.0.3", :port 9093},
;;      :replicas [{:id 2, :host "172.17.0.3", :port 9093}],
;;      :in-sync-replicas [{:id 2, :host "172.17.0.3", :port 9093}]}]}
sourceraw docstring

messagesclj

(messages consumer & {:keys [timeout] :or {timeout 1000}})

Consumes messages from currently subscribed partitions and returns a sequence of messages. If no messages are available, it will use the provided timeout (or default of 1000ms) to BLOCK for messages to be available, before returning.

Usage:

(messages consumer) ;; => [{:topic "topic-a", ;; :partition 0, ;; :offset 0, ;; :key nil, ;; :value "Count Zero says 1 at Fri Mar 11 14:34:27 GMT 2016"} ;; {:topic "topic-a", ;; :partition 0, ;; :offset 1, ;; :key nil, ;; :value "Count Zero says 2 at Fri Mar 11 14:34:31 GMT 2016"}]

(messages consumer :timeout 1500) ;; => [{:topic "topic-a", ;; :partition 0, ;; :offset 2, ;; :key nil, ;; :value "Count Zero says 3 at Fri Mar 11 14:34:32 GMT 2016"}]

Consumes messages from currently subscribed partitions and returns a sequence of messages.
If no messages are available, it will use the provided timeout (or default of 1000ms)
to BLOCK for messages to be available, before returning.

Usage:

(messages consumer)
;; => [{:topic "topic-a",
;;      :partition 0,
;;      :offset 0,
;;      :key nil,
;;      :value "Count Zero says 1 at Fri Mar 11 14:34:27 GMT 2016"}
;;     {:topic "topic-a",
;;      :partition 0,
;;      :offset 1,
;;      :key nil,
;;      :value "Count Zero says 2 at Fri Mar 11 14:34:31 GMT 2016"}]

(messages consumer :timeout 1500)
;; => [{:topic "topic-a",
;;      :partition 0,
;;      :offset 2,
;;      :key nil,
;;      :value "Count Zero says 3 at Fri Mar 11 14:34:32 GMT 2016"}]

sourceraw docstring

metricsclj

(metrics consumer)

Returns a sequence of maps representing all the consumer's internal metrics. Each map contains information about metric-group (:group), metric-name (:name), metric-description (:description), metric-tags (:tags) and metric-value (:value)

Usage :

(metrics consumer) ;; => [{:group "consumer-coordinator-metrics", ;; :name "sync-time-max", ;; :description "The max time taken for a group sync", ;; :tags {"client-id" "consumer-3"}, ;; :value 0.0} ;; {:group "consumer-fetch-manager-metrics", ;; :name "bytes-consumed-rate", ;; :description "The average number of bytes consumed per second", ;; :tags {"client-id" "consumer-3"}, ;; :value 0.0}]

Returns a sequence of maps representing all the consumer's internal metrics.
 Each map contains information about metric-group (:group), metric-name (:name),
 metric-description (:description), metric-tags (:tags) and metric-value (:value)

Usage :

(metrics consumer)
;; => [{:group "consumer-coordinator-metrics",
;;      :name "sync-time-max",
;;      :description "The max time taken for a group sync",
;;      :tags {"client-id" "consumer-3"},
;;      :value 0.0}
;;     {:group "consumer-fetch-manager-metrics",
;;      :name "bytes-consumed-rate",
;;      :description "The average number of bytes consumed per second",
;;      :tags {"client-id" "consumer-3"},
;;      :value 0.0}]
sourceraw docstring

pauseclj

(pause consumer tp-seq)

Stops messages being consumed from the given partitions. This takes effect on the next call on the messages function See http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(org.apache.kafka.common.TopicPartition...)

Usage:

(pause consumer {:topic "topic-a" :partition 2} {:topic "topic-b" :partition 0})

Stops messages being consumed from the given partitions.
 This takes effect on the next call on the messages function
 See http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause(org.apache.kafka.common.TopicPartition...)

Usage:

(pause consumer {:topic "topic-a" :partition 2}
                {:topic "topic-b" :partition 0})
sourceraw docstring

resumeclj

(resume consumer tp-seq)

Resumes messages being consumed from the given partitions. This takes effect on the next call on the messages function See http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#resume(org.apache.kafka.common.TopicPartition...)

Usage:

(resume consumer {:topic "topic-a" :partition 2} {:topic "topic-b" :partition 0})

Resumes messages being consumed from the given partitions.
 This takes effect on the next call on the messages function
 See http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#resume(org.apache.kafka.common.TopicPartition...)

Usage:

(resume consumer {:topic "topic-a" :partition 2}
                 {:topic "topic-b" :partition 0})
sourceraw docstring

seekclj

(seek consumer tp-seq offset)
(seek consumer topic partition offset)

Seeks the consumer offset to given offset on the topic-partitions.

NOTE The topic-partition can be given as 2 arguments, the topic (string) and partition (int) or it can be given as 1 argument, which is a map sequence e.g '({:topic "topic" :partition 2}). The offset can be a long, :beginning or :end.

Usage:

(seek consumer "topic-a" 23 7) ;; => nil

(seek consumer "topic-b" 23 :beginning) ;; => nil

(seek consumer "topic-c" 23 :end) ;; => nil

(seek consumer [{:topic "topic-a" :partition 23} {:topic "topic-b" :partition 23} {:topic "topic-c" :partition 23}] 7) ;; => nil

(seek consumer [{:topic "topic-a" :partition 23} {:topic "topic-b" :partition 23} {:topic "topic-c" :partition 23}] :beginning) ;; => nil

(seek consumer [{:topic "topic-a" :partition 23} {:topic "topic-b" :partition 23} {:topic "topic-c" :partition 23}] :end) ;; => nil

Seeks the consumer offset to given offset on the topic-partitions.

 NOTE The topic-partition can be given as 2 arguments, the topic (string) and partition (int)
 or it can be given as 1 argument, which is a map sequence e.g '({:topic "topic" :partition 2}).
 The offset can be a long, :beginning or :end.

Usage:

(seek consumer "topic-a" 23 7)
;; => nil

(seek consumer "topic-b" 23 :beginning)
;; => nil

(seek consumer "topic-c" 23 :end)
;; => nil

(seek consumer [{:topic "topic-a" :partition 23}
                {:topic "topic-b" :partition 23}
                {:topic "topic-c" :partition 23}] 7)
;; => nil

(seek consumer [{:topic "topic-a" :partition 23}
                {:topic "topic-b" :partition 23}
                {:topic "topic-c" :partition 23}] :beginning)
;; => nil

(seek consumer [{:topic "topic-a" :partition 23}
                {:topic "topic-b" :partition 23}
                {:topic "topic-c" :partition 23}] :end)
;; => nil

sourceraw docstring

string-deserializerclj

(string-deserializer)
source

subscribeclj

(subscribe consumer
           topics
           &
           {:keys [assigned-callback revoked-callback]
            :or {assigned-callback (fn [_]) revoked-callback (fn [_])}})

Subscribes the consumer to Topic partition(s) with callbacks for broker initiated assignments. The actual partitions can actually be specified (manual assignment) or left up to the Kafka broker (automatic assignment). This function performs 3 forms of subscription and they are -

  1. Single or Sequence of topic names <---- Automatic partition assignment by Kafka Broker
  2. Regular expression matching topic name(s) <---- Automatic partition assignment by Kafka Broker
  3. A sequence of specific topic partitions <---- Manual partition assignment by user/client/consumer

NOTE a)The above 3 forms are mutually exclusive, meaning you need to unsubcsribe in between subscribing using different forms b)Calling subscribe again with the same form but different arguments is equivalent to unsubscribing and then subscribing anew. c)The optional callback function arguments are only used for Automatic partition subscriptions i.e subcriptions using single name, sequence of names or regular expression The callback functions should be of a single arity and should expect a sequence of maps describing specific partitions (e.g [{:topic "topic-a" :partition 1} {:topic "topic-a" :partition 2}])

Usage:

(subscribe consumer "topic-a") ;; => nil

(subscribe consumer "topic-a" :assigned-callback (fn [p] (println "PartitionsAssigned:" p)) :revoked-callback (fn [p] (println "PartitionsRevoked:" p))) ;; => nil

(subscribe consumer ["topic-a" "topic-b"]) ;; => nil

(subscribe consumer #"topic-.+") ;; => nil

(subscribe consumer [{:topic "topic-a" :partitions #{0}} {:topic "topic-b" :partitions #{0 1}} {:topic "topic-c" :partitions #{0}}]) ;; => nil

For more in-depth information http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List) http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener) http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener) http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)

Subscribes the consumer to Topic partition(s) with callbacks for broker initiated assignments.
The actual partitions can actually be specified (manual assignment) or left up to the Kafka broker (automatic assignment).
This function performs 3 forms of subscription and they are -
1) Single or Sequence of topic names             <---- Automatic partition assignment by Kafka Broker
2) Regular expression matching topic name(s)     <---- Automatic partition assignment by Kafka Broker
3) A sequence of specific topic partitions       <---- Manual partition assignment by user/client/consumer

NOTE a)The above 3 forms are mutually exclusive, meaning you need to unsubcsribe in between subscribing using different forms
     b)Calling subscribe again with the same form but different arguments is equivalent to unsubscribing and then subscribing anew.
     c)The optional callback function arguments are only used for Automatic partition subscriptions
       i.e subcriptions using single name, sequence of names or regular expression
       The callback functions should be of a single arity and should expect a sequence of maps describing
       specific partitions (e.g [{:topic "topic-a" :partition 1} {:topic "topic-a" :partition 2}])

Usage:

(subscribe consumer "topic-a")
;; => nil

(subscribe consumer "topic-a" :assigned-callback (fn [p] (println "PartitionsAssigned:" p))
                                    :revoked-callback (fn [p] (println "PartitionsRevoked:" p)))
;; => nil

(subscribe consumer ["topic-a" "topic-b"])
;; => nil

(subscribe consumer #"topic-.+")
;; => nil

(subscribe consumer [{:topic "topic-a" :partitions #{0}}
                     {:topic "topic-b" :partitions #{0 1}}
                     {:topic "topic-c" :partitions #{0}}])
;; => nil

For more in-depth information
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List)
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.List,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)
http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.List)
sourceraw docstring

subscriptionsclj

(subscriptions consumer)

Returns all the topics that the consumer is subscribed to and the actual partitions that it's consuming from. The data is a sequence of maps with each map being made up of topic (assoc with :topic) and a set of consumed partitions (assoc with :partitions) NOTE Subscriptions made using only topics (names or regex patterns) will have their partitions automatically assigned/managed by the broker. This can lead a consumer to be subscribed to a topic but NOT consuming from any of it's partitions. (see the topic-c in the Usage example below)

Usage:

(subscriptions consumer) ;; => [{:topic "topic-a", :partitions #{0}}, ;; {:topic "topic-b", :partitions #{0 1 2}}, ;; {:topic "topic-c", :partitions #{}}]

Returns all the topics that the consumer is subscribed to and the actual partitions
that it's consuming from. The data is a sequence of maps with each map being made up
of topic (assoc with :topic) and a set of consumed partitions (assoc with :partitions)
NOTE Subscriptions made using only topics (names or regex patterns) will have their
     partitions automatically assigned/managed by the broker. This can lead a consumer
     to be subscribed to a topic but NOT consuming from any of it's partitions.
     (see the topic-c in the Usage example below)

Usage:

(subscriptions consumer)
;; => [{:topic "topic-a", :partitions #{0}},
;;     {:topic "topic-b", :partitions #{0 1 2}},
;;     {:topic "topic-c", :partitions #{}}]
sourceraw docstring

unsubscribeclj

(unsubscribe consumer)

Unsubcribes the consumer from any subscribed topics and/or partitions. It works for subscriptions carried out via subscribe-to-topics or subscribe-to-partitions functions

Unsubcribes the consumer from any subscribed topics and/or partitions.
It works for subscriptions carried out via subscribe-to-topics or subscribe-to-partitions functions
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close