Everything related to Kafka consumers.
Everything related to Kafka consumers.
(close consumer)
(close consumer timeout-ms)
Tries to close the consumer cleanly within the given timeout or a default one of 30 seconds.
It will try to complete pending commits and leave the group. If auto-commit is enabled, the current offsets will be committed. If those operations aren't completed when the timeout is reached, the consumer will be force closed.
Note that unblock
cannot be used to interrupt this fn.
@ consumer Kafka consumer.
@ timeout-ms Timeout in milliseconds.
=> nil
Throws
org.apache.kafka.common.errors
InterruptException
When the thread is interrupted while blocked.
Tries to close the consumer cleanly within the given timeout or a default one of 30 seconds. It will try to complete pending commits and leave the group. If auto-commit is enabled, the current offsets will be committed. If those operations aren't completed when the timeout is reached, the consumer will be force closed. Note that `unblock` cannot be used to interrupt this fn. @ consumer Kafka consumer. @ timeout-ms Timeout in milliseconds. => nil Throws org.apache.kafka.common.errors InterruptException When the thread is interrupted while blocked.
(commit-async consumer)
(commit-async consumer callback)
(commit-async consumer callback offsets)
Asynchronously commits offsets to Kafka.
If none are given, commits offsets from the last call to poll
.
The committed offsets will be used on the first fetch after every assignment and also on startup. As such, if the user need to store offsets anywhere else, this fn should not be used.
Actually commits on the next trip to the server, such as calling poll
.
The callback must accept as arguments a possible exception and a map of [topic partition] -> committed offset.
@ consumer Kafka consumer.
@ callback (nilable) Cf.
=> consumer
Ex. (commit-sync consumer)
(commit-sync consumer
(fn [exception offsets]
...))
(commit-sync consumer
(fn [exception offsets]
(when-not exception
(println "Should be true :"
(= (get offsets
["my-topic" 0])
24))))
{["my-topic" 0] 24
["another-topic" 3] 84})
Throws
Cf. commit-sync
for exceptions
Asynchronously commits offsets to Kafka. If none are given, commits offsets from the last call to `poll`. The committed offsets will be used on the first fetch after every assignment and also on startup. As such, if the user need to store offsets anywhere else, this fn should not be used. Actually commits on the next trip to the server, such as calling `poll`. The callback must accept as arguments a possible exception and a map of [topic partition] -> committed offset. @ consumer Kafka consumer. @ callback (nilable) Cf. => `consumer` Ex. (commit-sync consumer) (commit-sync consumer (fn [exception offsets] ...)) (commit-sync consumer (fn [exception offsets] (when-not exception (println "Should be true :" (= (get offsets ["my-topic" 0]) 24)))) {["my-topic" 0] 24 ["another-topic" 3] 84}) Throws Cf. `commit-sync` for exceptions
(commit-sync consumer)
(commit-sync consumer offsets)
Synchronously commits offsets to Kafka.
If none are given, commits offsets from the last call to poll
.
The committed offsets will be used on the first fetch after every assignment and also on startup. As such, if the user need to store offsets anywhere else, this fn should not be used.
@ consumer Kafka consumer.
@ offsets Map of [topic partition] to offsets.
=> consumer
Ex. (commit-sync consumer)
(commit-sync consumer
{["my-topic" 0] 24
["another-topic" 3] 84})
Throws
org.apache.kafka.clients.consumer
CommitFailedException
When the commit failed and cannot be retried (only occurs when using subscriptions or if there is an active
groupe with the same groupId).
org.apache.kafka.common.errors
WakeupException
When `unblock` is called before or while this fn is called.
InterruptException
When the calling thread is interrupted.
AuthorizationException
When not authorized to the specified topic.
KafkaException
Any other unrecoverable errors.
Synchronously commits offsets to Kafka. If none are given, commits offsets from the last call to `poll`. The committed offsets will be used on the first fetch after every assignment and also on startup. As such, if the user need to store offsets anywhere else, this fn should not be used. @ consumer Kafka consumer. @ offsets Map of [topic partition] to offsets. => `consumer` Ex. (commit-sync consumer) (commit-sync consumer {["my-topic" 0] 24 ["another-topic" 3] 84}) Throws org.apache.kafka.clients.consumer CommitFailedException When the commit failed and cannot be retried (only occurs when using subscriptions or if there is an active groupe with the same groupId). org.apache.kafka.common.errors WakeupException When `unblock` is called before or while this fn is called. InterruptException When the calling thread is interrupted. AuthorizationException When not authorized to the specified topic. KafkaException Any other unrecoverable errors.
(committed consumer topic-partitions)
(committed consumer topic partition)
Gets the last committed offset for one or several partitions.
May block if the partition is not assigned to this consumer or if the consumer hasn't yet initialized its cache of committed offsets.
@ consumer Kafka consumer.
@ topic-partitions
List of [topic partition].
Cf. milena.interop.java/topic-partition
=> Map of [topic partition] to offsets.
@ topic Topic name.
@ partition Partition number.
=> Offset.
Ex. (committed consumer "my-topic" 0) => 42
(committed consumer
[["my-topic" 0]
["another-topic" 3]])
=> {["my-topic" 0] 42
["another-topic" 3] 84}
Gets the last committed offset for one or several partitions. May block if the partition is not assigned to this consumer or if the consumer hasn't yet initialized its cache of committed offsets. @ consumer Kafka consumer. --- @ topic-partitions List of [topic partition]. Cf. `milena.interop.java/topic-partition` => Map of [topic partition] to offsets. --- @ topic Topic name. @ partition Partition number. => Offset. Ex. (committed consumer "my-topic" 0) => 42 (committed consumer [["my-topic" 0] ["another-topic" 3]]) => {["my-topic" 0] 42 ["another-topic" 3] 84}
(fast-forward consumer)
(fast-forward consumer topic-partitions)
(fast-forward consumer topic partition)
Fast forwards a consumer to the latest offset (ie. offset of the last message + 1) for one or several partitions.
Happens lazily on the next call to poll
or offs-current
.
If no partition is given, applies to all currently assigned partitions.
@ consumer Kafka consumer.
@ topic-partitions
List of [topic partition].
Cf. milena.interop.java/topic-partition
@ topic Topic name.
@ partition Partition number.
=> consumer
Ex. (fast-forward consumer "my-topic" 0)
(fast-forward consumer
[["my-topic" 0]
["another-topic" 3]])
Fast forwards a consumer to the latest offset (ie. offset of the last message + 1) for one or several partitions. Happens lazily on the next call to `poll` or `offs-current`. If no partition is given, applies to all currently assigned partitions. @ consumer Kafka consumer. --- @ topic-partitions List of [topic partition]. Cf. `milena.interop.java/topic-partition` --- @ topic Topic name. @ partition Partition number. --- => `consumer` Ex. (fast-forward consumer "my-topic" 0) (fast-forward consumer [["my-topic" 0] ["another-topic" 3]])
(listen consumer source)
(listen consumer source f-rebalance)
Subscribes a consumer to topics or assigns it to specific partitions.
An assignement precisely refers to a [topic partition] whereas a subscription only ask for a topic, the partition being assigned dynamically.
<!> A consumer can only consume from one type of source. This fn will throw if the user try to mix, for instance, subscriptions and assignments, or regexes and strings.
@ consumer Kafka consumer.
@ source One of : regular expression representing topics to subscribe to | list of topics as strings to subscribe to | list of [topic partition] to be assigned to | nil
@ f-rebalance
Optional fn for subscriptions.
Cf. milena.interop.java/consumer-rebalance-listener
Ex. (listen consumer #"topic-.+" (fn [assigned? topic-partitions] (when assigned? ...)))
(listen consumer
["my-topic"
"another-topic"])
(listen consumer
[["my-topic" 0]
["another-topic" 3]])
(listen consumer
nil)
Subscribes a consumer to topics or assigns it to specific partitions. An assignement precisely refers to a [topic partition] whereas a subscription only ask for a topic, the partition being assigned dynamically. <!> A consumer can only consume from one type of source. This fn will throw if the user try to mix, for instance, subscriptions and assignments, or regexes and strings. @ consumer Kafka consumer. @ source One of : regular expression representing topics to subscribe to | list of topics as strings to subscribe to | list of [topic partition] to be assigned to | nil @ f-rebalance Optional fn for subscriptions. Cf. `milena.interop.java/consumer-rebalance-listener` Ex. (listen consumer #"topic-.+" (fn [assigned? topic-partitions] (when assigned? ...))) (listen consumer ["my-topic" "another-topic"]) (listen consumer [["my-topic" 0] ["another-topic" 3]]) (listen consumer nil)
(listening consumer)
Gets all the partitions the consumer is assigned to or its subscriptions.
@ consumer Kafka consumer.
=> {:partitions
Set of topic-partitions.
Cf. milena.interop.clj/topic-partition
:subscriptions
Set of topic names.}
Cf. listen
Gets all the partitions the consumer is assigned to or its subscriptions. @ consumer Kafka consumer. => {:partitions Set of topic-partitions. Cf. `milena.interop.clj/topic-partition` :subscriptions Set of topic names.} Cf. `listen`
(listening? consumer source)
Is the consumer listening to the given topic / [topic partition] ?
Is the consumer listening to the given topic / [topic partition] ?
(make)
(make {:as opts
:keys [nodes config deserializer deserializer-key deserializer-value]
:or {nodes [["localhost" 9092]]
deserializer M.deserialize/byte-array
deserializer-key deserializer
deserializer-value deserializer}})
Builds a Kafka consumer.
<!> Consumers are NOT thread safe ! 1 consumer / thread or a queueing policy must be implemented.
@ opts (nilable) {:nodes (nilable) List of [host port].
:config (nilable) Kafka configuration map. Cf. https://kafka.apache.org/documentation/#newconsumerconfigs
:deserializer (nilable)
Kafka deserializer or fn eligable for becoming one.
Cf. milena.deserialize
milena.deserialize/make
:deserializer-key (nilable)
Defaulting to ?deserializer
.
:deserializer-value (nilable)
Defaulting to ?deserializer
.}
=> org.apache.kafka.clients.consumer.KafkaConsumer
Ex. (make {:nodes [["some_host" 9092]] :config {:group.id "my_group" :enable.auto.commit false} :deserializer-key milena.deserialize/string :deserializer-value (fn [_ data] (nippy/thaw data)) :listen [["my-topic" 3]]})
Builds a Kafka consumer. <!> Consumers are NOT thread safe ! 1 consumer / thread or a queueing policy must be implemented. @ opts (nilable) {:nodes (nilable) List of [host port]. :config (nilable) Kafka configuration map. Cf. https://kafka.apache.org/documentation/#newconsumerconfigs :deserializer (nilable) Kafka deserializer or fn eligable for becoming one. Cf. `milena.deserialize` `milena.deserialize/make` :deserializer-key (nilable) Defaulting to `?deserializer`. :deserializer-value (nilable) Defaulting to `?deserializer`.} => org.apache.kafka.clients.consumer.KafkaConsumer Ex. (make {:nodes [["some_host" 9092]] :config {:group.id "my_group" :enable.auto.commit false} :deserializer-key milena.deserialize/string :deserializer-value (fn [_ data] (nippy/thaw data)) :listen [["my-topic" 3]]})
(metrics consumer)
Gets metrics about this consumer.
@ consumer Kafka consumer.
=> Cf. milena.interop.clj/metrics
Gets metrics about this consumer. @ consumer Kafka consumer. => Cf. `milena.interop.clj/metrics`
(offs-by-ts consumer timestamps)
(offs-by-ts consumer topic partition ts)
Finds offsets for the given partition(s) by timestamp, ie. the earliest offsets whose timestamp is greater than or equal to the corresponding ones.
<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.
@ consumer Kafka consumer.
@ topic-partitions
List of [topic partition].
Cf. milena.interop.java/topic-partition
=> Map of [topic partition] to results.
results {:timestamp Unix timestamp of the record.
:offset Offset in the partition.}
@ topic Topic name.
@ partition Partition number.
=> Offset.
Ex. (offs-by-ts consumer "my-topic" 0 1507812268270) => 42
(offs-by-ts consumer
{["my-topic" 0] 1507812268270
["another-topic" 3] 1507812338294})
=> {["my-topic" 0] {:timestamp 1507812268270
:offset 42
["another-topic" 3] {:timestamp 1507812338294
:offset 84}
Finds offsets for the given partition(s) by timestamp, ie. the earliest offsets whose timestamp is greater than or equal to the corresponding ones. <!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side. @ consumer Kafka consumer. --- @ topic-partitions List of [topic partition]. Cf. `milena.interop.java/topic-partition` => Map of [topic partition] to results. + results {:timestamp Unix timestamp of the record. :offset Offset in the partition.} --- @ topic Topic name. @ partition Partition number. => Offset. Ex. (offs-by-ts consumer "my-topic" 0 1507812268270) => 42 (offs-by-ts consumer {["my-topic" 0] 1507812268270 ["another-topic" 3] 1507812338294}) => {["my-topic" 0] {:timestamp 1507812268270 :offset 42 ["another-topic" 3] {:timestamp 1507812338294 :offset 84}
(offs-current consumer topic-partitions)
(offs-current consumer topic partition)
Gets the current offset of a consumer on one or several partitions (ie. the offset of the next record).
@ consumer Kafka consumer.
@ topic-partitions
List of [topic partition].
Cf. milena.interop.java/topic-partition
=> Map of [topic partition] to offsets.
@ topic Topic name.
@ partition Partition number.
=> Position.
Ex. (offs-current consumer "my-topic" 0) => 42
(offs-current consumer
[["my-topic" 0]
["another-topic" 3]])
=> {["my-topic" 0] 42
["another-topic" 3] 84}
Throws
org.apache.kafka.common.errors
WakeupException
When `unblock` is called before or while this fn is called.
InterruptException
When the calling thread is interrupted.
AuthorizationException
When not authorized to the specified topic.
TimeoutException
When the topic metadata could not be fetched before expiration of the configured request timeout.
KafkaException
Any other unrecoverable errors.
Gets the current offset of a consumer on one or several partitions (ie. the offset of the next record). @ consumer Kafka consumer. --- @ topic-partitions List of [topic partition]. Cf. `milena.interop.java/topic-partition` => Map of [topic partition] to offsets. --- @ topic Topic name. @ partition Partition number. => Position. Ex. (offs-current consumer "my-topic" 0) => 42 (offs-current consumer [["my-topic" 0] ["another-topic" 3]]) => {["my-topic" 0] 42 ["another-topic" 3] 84} Throws org.apache.kafka.common.errors WakeupException When `unblock` is called before or while this fn is called. InterruptException When the calling thread is interrupted. AuthorizationException When not authorized to the specified topic. TimeoutException When the topic metadata could not be fetched before expiration of the configured request timeout. KafkaException Any other unrecoverable errors.
(offs-first consumer topic-partitions)
(offs-first consumer topic partition)
Finds the first available offset of the given topic-partition(s).
<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.
@ consumer Kafka consumer.
@ topic-partitions
List of [topic partition].
Cf. milena.interop.java/topic-partition
=> Map of [topic partition] to offset.
@ topic Topic name.
@ partition Partition number.
=> Offset.
Ex. (offs-first consumer "my-topic" 0) => 421
(offs-first consumer
[["my-topic" 0]
["another-topic" 3]])
=> {["my-topic" 0] 421
["another-topic" 3] 842}
Finds the first available offset of the given topic-partition(s). <!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side. @ consumer Kafka consumer. --- @ topic-partitions List of [topic partition]. Cf. `milena.interop.java/topic-partition` => Map of [topic partition] to offset. --- @ topic Topic name. @ partition Partition number. => Offset. Ex. (offs-first consumer "my-topic" 0) => 421 (offs-first consumer [["my-topic" 0] ["another-topic" 3]]) => {["my-topic" 0] 421 ["another-topic" 3] 842}
(offs-latest consumer topic-partitions)
(offs-latest consumer topic partition)
Finds the latest offset of the given partition(s), ie. the offset of the last message + 1.
@ consumer Kafka consumer.
@ topic-partitions
List of [topic partition].
Cf. milena.interop.java/topic-partition
=> Map of [topic partition] to offset.
@ topic Topic name.
@ partition Partition number.
=> Offset.
Cf. offs-first
Finds the latest offset of the given partition(s), ie. the offset of the last message + 1. @ consumer Kafka consumer. --- @ topic-partitions List of [topic partition]. Cf. `milena.interop.java/topic-partition` => Map of [topic partition] to offset. --- @ topic Topic name. @ partition Partition number. => Offset. Cf. `offs-first`
(partitions consumer topic)
Gets a list of partitions for a given topic.
@ consumer Kafka consumer.
@ topic Topic name.
=> List of partitions.
Cf. interop.clj/partition-info
Throws
org.apache.kafka.common.errors
WakeupException
When `unblock` is called before or while this fn is called.
InterruptException
When the calling thread is interrupted.
AuthorizationException
When not authorized to the specified topic.
TimeoutException
When the topic metadata could not be fetched before expiration of the configured request timeout.
KafkaException
Any other unrecoverable errors.
Gets a list of partitions for a given topic. @ consumer Kafka consumer. @ topic Topic name. => List of partitions. Cf. `interop.clj/partition-info` Throws org.apache.kafka.common.errors WakeupException When `unblock` is called before or while this fn is called. InterruptException When the calling thread is interrupted. AuthorizationException When not authorized to the specified topic. TimeoutException When the topic metadata could not be fetched before expiration of the configured request timeout. KafkaException Any other unrecoverable errors.
(pause consumer topic-partitions)
(pause consumer topic partition)
Temporarely pauses consumption.
@ consumer Kafka consumer.
@ topic-partitions
List of [topic partition].
Cf. milena.interop.java/topic-partition
@ topic Topic name.
@ partition Partition number.
=> consumer
Ex. (pause consumer "my-topic" 0)
(pause consumer
[["my-topic" 0]
["another-topic" 3]])
Temporarely pauses consumption. @ consumer Kafka consumer. --- @ topic-partitions List of [topic partition]. Cf. `milena.interop.java/topic-partition` --- @ topic Topic name. @ partition Partition number. --- => `consumer` Ex. (pause consumer "my-topic" 0) (pause consumer [["my-topic" 0] ["another-topic" 3]])
(paused consumer)
@ consumer Kafka consumer.
=> Set of [topic partition] currently paused.
Cf. pause
@ consumer Kafka consumer. => Set of [topic partition] currently paused. Cf. `pause`
(paused? consumer topic-partition)
(paused? consumer topic partition)
Is a [topic partition] currenly paused ?
Cf. pause
paused
Is a [topic partition] currenly paused ? Cf. `pause` `paused`
(poll consumer)
(poll consumer timeout-ms)
Synchronously polls records.
@ consumer Kafka consumer.
@ timeout-ms (nilable) Optional timeout in milliseconds. Nil will wait forever. 0 returns what is available in the consumer buffer without blocking.
=> Sequence of individual records.
Cf. milena.interop.clj/consumer-record
Throws
org.apache.kafka.clients.consumer
InvalidOffsetException
When the offset for a partition or set of partitions is undefined or out of range and no offset
reset policy has been configured.
org.apache.kafka.common.errors
WakeupException
When `unblock` is called while blocking.
InterruptException
The calling thread is interrupted while blocking.
AuthorizationException
When not authorized to any of the assigned topics or to the configured groupId.
KafkaException
Any other unrecoverable errors (eg. deserializing key/value).
Synchronously polls records. @ consumer Kafka consumer. @ timeout-ms (nilable) Optional timeout in milliseconds. Nil will wait forever. 0 returns what is available in the consumer buffer without blocking. => Sequence of individual records. Cf. `milena.interop.clj/consumer-record` Throws org.apache.kafka.clients.consumer InvalidOffsetException When the offset for a partition or set of partitions is undefined or out of range and no offset reset policy has been configured. org.apache.kafka.common.errors WakeupException When `unblock` is called while blocking. InterruptException The calling thread is interrupted while blocking. AuthorizationException When not authorized to any of the assigned topics or to the configured groupId. KafkaException Any other unrecoverable errors (eg. deserializing key/value).
(poll-partitions consumer)
(poll-partitions consumer timeout-ms)
Synchronously polls records by partitions.
Behaves exactly like poll
but returns a map of [topic partition] to sequence of individual records.
More efficient when the consumer polls several partitions in order to dispatch the results to workers.
Cf. poll
Synchronously polls records by partitions. Behaves exactly like `poll` but returns a map of [topic partition] to sequence of individual records. More efficient when the consumer polls several partitions in order to dispatch the results to workers. Cf. `poll`
(poll-seq consumer)
(poll-seq consumer timeout-ms)
Convert a consumer to a sequence of individual records by lazily and continuously calling poll
.
Ex. (take 5 (to-seq consumer))
Cf. poll
for arguments and exceptions.
Convert a consumer to a sequence of individual records by lazily and continuously calling `poll`. Ex. (take 5 (to-seq consumer)) Cf. `poll` for arguments and exceptions.
(resume consumer)
(resume consumer topic-partitions)
(resume consumer topic partition)
Resumes consumptions.
Resumes everything if no other arg than the consumer is provided.
@ consumer Kafka consumer.
@ topic-partitions
List of [topic partition].
Cf. milena.interop.java/topic-partition
@ topic Topic name.
@ partition Partition number.
=> consumer
Ex. (resume consumer)
(resume "my-topic"
0)
(resume [["my-topic" 0]
["another-topic" 3]])
Cf. pause
Resumes consumptions. Resumes everything if no other arg than the consumer is provided. @ consumer Kafka consumer. --- @ topic-partitions List of [topic partition]. Cf. `milena.interop.java/topic-partition` --- @ topic Topic name. @ partition Partition number. --- => `consumer` Ex. (resume consumer) (resume "my-topic" 0) (resume [["my-topic" 0] ["another-topic" 3]]) Cf. `pause`
(rewind consumer)
(rewind consumer topic-partitions)
(rewind consumer topic partition)
Rewinds a consumer to the first available offset for one or several partitions.
Happens lazily on the next call to poll
or offs-current
.
If no partition is given, applies to all currently assigned partitions.
@ consumer Kafka consumer.
@ topic-partitions
List of [topic partition].
Cf. milena.interop.java/topic-partition
@ topic Topic name.
@ partition Partition number.
=> consumer
Ex. (rewind consumer "my-topic" 0)
(rewind consumer
[["my-topic" 0]
["another-topic" 3]])
Rewinds a consumer to the first available offset for one or several partitions. Happens lazily on the next call to `poll` or `offs-current`. If no partition is given, applies to all currently assigned partitions. @ consumer Kafka consumer. --- @ topic-partitions List of [topic partition]. Cf. `milena.interop.java/topic-partition` --- @ topic Topic name. @ partition Partition number. --- => `consumer` Ex. (rewind consumer "my-topic" 0) (rewind consumer [["my-topic" 0] ["another-topic" 3]])
(safely consumer timeout-ms & body)
If the body doesn't compute before the required timeout, unblock
will be called on the consumer.
If the body doesn't compute before the required timeout, `unblock` will be called on the consumer.
(seek consumer offsets)
(seek consumer topic partition offset)
Seeks one or several partitions to a new offset (ie. offset of the next record).
Happens lazily on the next call to poll
.
Note that you may loose data if this fn is arbitrarily called in the middle of consumption.
@ consumer Kafka consumer.
@ offsets
Map of [topic partition] to offset.
Cf. milena.interop.java/topic-partition
@ topic Topic name.
@ partition Partition number.
@ offset New offset.
=> consumer
Ex. (seek consumer "my-topic" 0 42)
(seek consumer
{["my-topic" 0] 42
["another-topic" 3] 84})
Seeks one or several partitions to a new offset (ie. offset of the next record). Happens lazily on the next call to `poll`. Note that you may loose data if this fn is arbitrarily called in the middle of consumption. @ consumer Kafka consumer. --- @ offsets Map of [topic partition] to offset. Cf. `milena.interop.java/topic-partition` --- @ topic Topic name. @ partition Partition number. @ offset New offset. --- => `consumer` Ex. (seek consumer "my-topic" 0 42) (seek consumer {["my-topic" 0] 42 ["another-topic" 3] 84})
(topics consumer)
Gets a list of metadata about partitions for all the topics the consumer is authorized to consume.
@ consumer Kafka consumer.
=> Map of topic names to partition infos.
Cf. M.interop.clj/partition-info
Throws
org.apache.kafka.common.errors
WakeupException
When `unblock` is called before or while this fn is called.
InterruptException
When the calling thread is interrupted before of while this fn is called.
TimeoutException
When the topic metadata could not be fetched before expiration of the configured request timeout.
KafkaException
Any other unrecoverable errors.
Gets a list of metadata about partitions for all the topics the consumer is authorized to consume. @ consumer Kafka consumer. => Map of topic names to partition infos. Cf. `M.interop.clj/partition-info` Throws org.apache.kafka.common.errors WakeupException When `unblock` is called before or while this fn is called. InterruptException When the calling thread is interrupted before of while this fn is called. TimeoutException When the topic metadata could not be fetched before expiration of the configured request timeout. KafkaException Any other unrecoverable errors.
(unblock consumer)
From another thread, unblocks the consumer.
The blocking thread will throw an org.apache.kafka.common.errors.WakeupException.
If the thread is not blocking on a fn which can throw such an exception, the next call to such a fn will raise it instead.
Must be used sparingly, not to compensate for a bad design.
@ consumer Kafka consumer.
=> consumer
From another thread, unblocks the consumer. The blocking thread will throw an org.apache.kafka.common.errors.WakeupException. If the thread is not blocking on a fn which can throw such an exception, the next call to such a fn will raise it instead. Must be used sparingly, not to compensate for a bad design. @ consumer Kafka consumer. => `consumer`
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close