Kafka consumers.
This API can also be used by mock consumers (cf. dvlopt.kafka.in.mock namespace).
Kafka consumers. This API can also be used by mock consumers (cf. dvlopt.kafka.in.mock namespace).
(beginning-offsets consumer topic-partitions)
(beginning-offsets consumer topic-partitions options)
Requests a map of [topic partition] -> first available offset for consumption.
This function does not change the current position of the consumer.
<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.
A map of options may be given :
:dvlopt.kafka/timeout
Cf. dvlopt.kafka
for description of time intervals
Ex. (beginning-offsets consumer [["my-topic" 0] ["another-topic" 3]] {:dvlopt.kafka/timeout [5 :seconds]})
Requests a map of [topic partition] -> first available offset for consumption. This function does not change the current position of the consumer. <!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side. A map of options may be given : :dvlopt.kafka/timeout Cf. `dvlopt.kafka` for description of time intervals Ex. (beginning-offsets consumer [["my-topic" 0] ["another-topic" 3]] {:dvlopt.kafka/timeout [5 :seconds]})
(close consumer)
(close consumer options)
Closes the consumer and releases all associated resources.
A map of options may be given :
:dvlopt.kafka/timeout
Default one is of 30 seconds.
The consumer will try to complete pending commits and leave its consumer group. If auto-commit
is enabled, the current offsets will be committed. If operations are not completed before the
timeout, the consumer will be force closed.
Cf. dvlopt.kafka
for description of time intervals
Closes the consumer and releases all associated resources. A map of options may be given : :dvlopt.kafka/timeout Default one is of 30 seconds. The consumer will try to complete pending commits and leave its consumer group. If auto-commit is enabled, the current offsets will be committed. If operations are not completed before the timeout, the consumer will be force closed. Cf. `dvlopt.kafka` for description of time intervals
(commit-offsets consumer)
(commit-offsets consumer options)
Manually and synchronously commits offsets to Kafka.
The committed offsets will be used on the first fetch after every assignment and also on startup. As such, if the user need to store offsets anywhere else, this function should not be used.
A map of options may be given :
:dvlopt.kafka/timeout
Cf. dvlopt.kafka
for description of time intervals
::topic-partition->offset
Map of [topic partition] -> offset to commit. If none is provided, offsets from the last call to poll
and variations
are used.
Ex. ;; commits specific offsets
(commit-offsets consumer
{:dvlopt.kafka/timeout [5 :seconds]
::topic-partition->offset {["my-topic" 0] 24
["another-topic" 3] 84})
Manually and synchronously commits offsets to Kafka. The committed offsets will be used on the first fetch after every assignment and also on startup. As such, if the user need to store offsets anywhere else, this function should not be used. A map of options may be given : :dvlopt.kafka/timeout Cf. `dvlopt.kafka` for description of time intervals ::topic-partition->offset Map of [topic partition] -> offset to commit. If none is provided, offsets from the last call to `poll` and variations are used. Ex. ;; commits specific offsets (commit-offsets consumer {:dvlopt.kafka/timeout [5 :seconds] ::topic-partition->offset {["my-topic" 0] 24 ["another-topic" 3] 84})
(commit-offsets-async consumer)
(commit-offsets-async consumer options)
Manually and asynchronously commits offsets to Kafka.
The end result is the same as commit-offsets-sync
but the offsets will be committed on the next
trip to the server, such as calling poll
.
Multiple calls to this function are garanteed to be sent in order and any pending async commit will happen before a new sync one.
A map of options may be given :
::on-committed Callback called when offsets are actually committed. Acceps 2 arguments : an exception in case of failure and a map of [topic partition] -> committed offset in case of success.
::topic-partition->offset
Map of [topic partition] -> offset to commit. If none is provided, offsets from the last call to poll
and variations
are used.
Ex. (commit-offsets-async consumer {::on-committed (fn callback [exception topic-partition->offset] (when exception ...)) ::topic-partition->offset {["some_topic" 0]} 42})
Manually and asynchronously commits offsets to Kafka. The end result is the same as `commit-offsets-sync` but the offsets will be committed on the next trip to the server, such as calling `poll`. Multiple calls to this function are garanteed to be sent in order and any pending async commit will happen before a new sync one. A map of options may be given : ::on-committed Callback called when offsets are actually committed. Acceps 2 arguments : an exception in case of failure and a map of [topic partition] -> committed offset in case of success. ::topic-partition->offset Map of [topic partition] -> offset to commit. If none is provided, offsets from the last call to `poll` and variations are used. Ex. (commit-offsets-async consumer {::on-committed (fn callback [exception topic-partition->offset] (when exception ...)) ::topic-partition->offset {["some_topic" 0]} 42})
(committed-offset consumer [topic partition :as topic-partition])
(committed-offset consumer [topic partition] options)
Requests the last committed offset for a [topic partition] (nil if nothing has been committed).
May block if the partition is not assigned to this consumer or if the consumer hasn't yet initialized its cache of committed offsets.
A map of options may be given :
:dvlopt.kafka/timeout
Cf. dvlopt.kafka
for description of time intervals
Requests the last committed offset for a [topic partition] (nil if nothing has been committed). May block if the partition is not assigned to this consumer or if the consumer hasn't yet initialized its cache of committed offsets. A map of options may be given : :dvlopt.kafka/timeout Cf. `dvlopt.kafka` for description of time intervals
(consumer)
(consumer options)
Builds a Kafka consumer.
<!> Consumers are NOT thread safe ! 1 consumer / thread or a queueing policy must be implemented.
A map of options may be given :
::configuration Map of Kafka consumer properties. Cf. https://kafka.apache.org/documentation/#newconsumerconfigs
:dvlopt.kafka/nodes List of [host port].
:dvlopt.kafka/deserializer.key
Prepared deserialize or a function coercing a byte-array to a key (cf. deserializer
).
:dvlopt.kafka/deserializer.value Idem, but for values.
Ex. (consumer {::configuration {"group.id" "my-group" "enable.auto.commit" false} :dvlopt.kafka/nodes [["some_host"] 9092] :dvlopt.kafka/deserializer.key :string :dvlopt.kafka/deserializer.value (fn [data _metadata] (some-> data nippy/thaw))})
Builds a Kafka consumer. <!> Consumers are NOT thread safe ! 1 consumer / thread or a queueing policy must be implemented. A map of options may be given : ::configuration Map of Kafka consumer properties. Cf. https://kafka.apache.org/documentation/#newconsumerconfigs :dvlopt.kafka/nodes List of [host port]. :dvlopt.kafka/deserializer.key Prepared deserialize or a function coercing a byte-array to a key (cf. `deserializer`). :dvlopt.kafka/deserializer.value Idem, but for values. Ex. (consumer {::configuration {"group.id" "my-group" "enable.auto.commit" false} :dvlopt.kafka/nodes [["some_host"] 9092] :dvlopt.kafka/deserializer.key :string :dvlopt.kafka/deserializer.value (fn [data _metadata] (some-> data nippy/thaw))})
(end-offsets consumer topic-partitions)
(end-offsets consumer topic-partitions options)
Requests a map of [topic partition] -> end offset (ie. offset of the last message + 1).
Works like beginning-offsets
.
Requests a map of [topic partition] -> end offset (ie. offset of the last message + 1). Works like `beginning-offsets`.
(fast-forward consumer)
(fast-forward consumer topic-partitions)
Fast forwards a consumer to the end of the given [topic partition]'s.
Actually happens lazily on the next call to poll
or next-offset
.
If no [topic partition] is supplied, applies to all currently assigned partitions.
If the consumer was configured with :isolation.level = "read_committed", the latest offsets will be the "last stable" ones.
<!> Has no effect on mock consumers. <!>
Ex. ;; fast-forward all currently registered [topic partition]'s
(fast-forward consumer)
;; fast-forward specific currently registered [topic partition]'s
(fast-forward consumer
[["my-topic" 0]
["another-topic" 3]])
Fast forwards a consumer to the end of the given [topic partition]'s. Actually happens lazily on the next call to `poll` or `next-offset`. If no [topic partition] is supplied, applies to all currently assigned partitions. If the consumer was configured with :isolation.level = "read_committed", the latest offsets will be the "last stable" ones. <!> Has no effect on mock consumers. <!> Ex. ;; fast-forward all currently registered [topic partition]'s (fast-forward consumer) ;; fast-forward specific currently registered [topic partition]'s (fast-forward consumer [["my-topic" 0] ["another-topic" 3]])
(metrics consumer)
Requests metrics about this consumer, exactly like dvlopt.kafka.out/metrics
.
Requests metrics about this consumer, exactly like `dvlopt.kafka.out/metrics`.
(next-offset consumer [topic partition :as topic-partition])
(next-offset consumer [topic partition] options)
Given a [topic partition], requests the offset of the next record this consumer can consume.
Issues a remote call only if there is no current position for the requested [topic partition].
A map of options may be given :
:dvlopt.kafka/timeout
Cf. dvlopt.kafka
for description of time intervals
Ex. (next-offset consumer ["my-topic" 0] {:dvlopt.kafka/timeout [5 :seconds]})
Given a [topic partition], requests the offset of the next record this consumer can consume. Issues a remote call only if there is no current position for the requested [topic partition]. A map of options may be given : :dvlopt.kafka/timeout Cf. `dvlopt.kafka` for description of time intervals Ex. (next-offset consumer ["my-topic" 0] {:dvlopt.kafka/timeout [5 :seconds]})
(offsets-for-timestamps consumer topic-partition->timestamp)
(offsets-for-timestamps consumer topic-partition->timestamp options)
Requests a map of [topic partition] -> map containing :
:dvlopt.kafka/offset Earliest offset whose timestamp is greather than or equal to the given one for that [topic partition].
:dvlopt.kafka/timestamp Timestamp of the record at that offset.
<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.
A map of options may be given :
:dvlopt.kafka/timeout
Cf. dvlopt.kafka
for description of time intervals
Ex. (offsets-for-timestamps consumer {["my-topic" 0] 1507812268270 ["another-topic" 3] 1507812338294} {:dvlopt.kafka/timeout [5 :seconds]})
Requests a map of [topic partition] -> map containing : :dvlopt.kafka/offset Earliest offset whose timestamp is greather than or equal to the given one for that [topic partition]. :dvlopt.kafka/timestamp Timestamp of the record at that offset. <!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side. A map of options may be given : :dvlopt.kafka/timeout Cf. `dvlopt.kafka` for description of time intervals Ex. (offsets-for-timestamps consumer {["my-topic" 0] 1507812268270 ["another-topic" 3] 1507812338294} {:dvlopt.kafka/timeout [5 :seconds]})
(partitions consumer topic)
(partitions consumer topic options)
Requests a list of partitions for a given topic.
For the returned value, cf. dvlopt.kafka.out/partitions
.
A map of options may be given :
:dvlopt.kafka/timeout
Cf. dvlopt.kafka
for description of time intervals
Requests a list of partitions for a given topic. For the returned value, cf. `dvlopt.kafka.out/partitions`. A map of options may be given : :dvlopt.kafka/timeout Cf. `dvlopt.kafka` for description of time intervals
(pause consumer topic-partitions)
Suspends fetching records from the given [topic partition]'s until resume
is called.
This function does not affect subscriptions nor does it trigger a consumer group rebalance.
Ex. (pause consumer [["my-topic" 0] ["another-topic" 3]])
Suspends fetching records from the given [topic partition]'s until `resume` is called. This function does not affect subscriptions nor does it trigger a consumer group rebalance. Ex. (pause consumer [["my-topic" 0] ["another-topic" 3]])
(paused consumer)
Returns a set of the currently paused [topic partition]'s.
Returns a set of the currently paused [topic partition]'s.
(poll consumer)
(poll consumer options)
Synchronously polls records from the registered sources.
A map of options may be given :
:dvlopt.kafka/timeout
An emtpy interval such as [0 :milliseconds] returns what is available in the consumer buffer without blocking.
Cf. dvlopt.kafka
for description of time intervals
A record is a map possibly containing :
:dvlopt.kafka/headers
:dvlopt.kafka/key
:dvlopt.kafka/offset (mandatory)
:dvlopt.kafka/partition (mandatory)
:dvlopt.kafka/timestamp (mandatory)
:dvlopt.kafka/timestamp.type
:dvlopt.kafka/topic (mandatory)
:dvlopt.kafka/value
Cf. dvlopt.kafka
section "Records"
Synchronously polls records from the registered sources. A map of options may be given : :dvlopt.kafka/timeout An emtpy interval such as [0 :milliseconds] returns what is available in the consumer buffer without blocking. Cf. `dvlopt.kafka` for description of time intervals A record is a map possibly containing : :dvlopt.kafka/headers :dvlopt.kafka/key :dvlopt.kafka/offset (mandatory) :dvlopt.kafka/partition (mandatory) :dvlopt.kafka/timestamp (mandatory) :dvlopt.kafka/timestamp.type :dvlopt.kafka/topic (mandatory) :dvlopt.kafka/value Cf. `dvlopt.kafka` section "Records"
(poll-seq consumer)
(poll-seq consumer options)
Returns a sequence of records obtained by lazily and continuously calling poll
under the hood.
Ex. (take 5 (poll-seq consumer {:dvlopt.kafka/timeout [5 :seconds]}))
Returns a sequence of records obtained by lazily and continuously calling `poll` under the hood. Ex. (take 5 (poll-seq consumer {:dvlopt.kafka/timeout [5 :seconds]}))
(poll-topic-partitions consumer)
(poll-topic-partitions consumer options)
Behaves exactly like poll
but returns a map of [topic partition] -> list of record.
More efficient when records are dispatched to workers by [topic partition].
Behaves exactly like `poll` but returns a map of [topic partition] -> list of record. More efficient when records are dispatched to workers by [topic partition].
(register-for consumer source)
(register-for consumer source options)
A consumer can either be assigned to a list of specific [topic partition]'s by the user or subscribe to a list of topics or a regular expression describing topics. When subscribing, the [topic partition]'s are dynamically assigned by the broker. Assignments change over time. For instance, when another consumer from the same consumer group subscribes to the same topics, [topic partition]'s are rebalanced between consumers so that one of them does not overwork or underwork.
Everytime this function is called, the consumer is unregistered from previous sources if there are any.
A map of options may be given :
::on-rebalance Callback in order to be informed in case of dynamic :assignment or :revocation (first argument) regarding [topic partition]'s (second argument).
Ex. ;; dynamic subscription by regular expression
(register-for consumer
#"topic-.+"
{::on-rebalance (fn [operation topic-partitions]
(when (= operation
:assignment)
...))})
;; dynamic subscription to given topics
(register-for consumer
["my-topic"
"another-topic"])
;; static assignment to specific [topic partition]'s
(register-for consumer
[["my-topic" 0]
["another-topic" 3]])
A consumer can either be assigned to a list of specific [topic partition]'s by the user or subscribe to a list of topics or a regular expression describing topics. When subscribing, the [topic partition]'s are dynamically assigned by the broker. Assignments change over time. For instance, when another consumer from the same consumer group subscribes to the same topics, [topic partition]'s are rebalanced between consumers so that one of them does not overwork or underwork. Everytime this function is called, the consumer is unregistered from previous sources if there are any. A map of options may be given : ::on-rebalance Callback in order to be informed in case of dynamic :assignment or :revocation (first argument) regarding [topic partition]'s (second argument). Ex. ;; dynamic subscription by regular expression (register-for consumer #"topic-.+" {::on-rebalance (fn [operation topic-partitions] (when (= operation :assignment) ...))}) ;; dynamic subscription to given topics (register-for consumer ["my-topic" "another-topic"]) ;; static assignment to specific [topic partition]'s (register-for consumer [["my-topic" 0] ["another-topic" 3]])
(registered-for consumer)
Returns a map containing :
::assignments Set of all [topic partition]'s the consumer is currently assigned to, either by the broker or by the user.
::subscriptions Set of all topics the consumer is subscribed to, if any.
Returns a map containing : ::assignments Set of all [topic partition]'s the consumer is currently assigned to, either by the broker or by the user. ::subscriptions Set of all topics the consumer is subscribed to, if any.
(registered-for? consumer source)
Is the consumer listening to the given topic / [topic partition] ?
Is the consumer listening to the given topic / [topic partition] ?
(resume consumer)
(resume consumer topic-partitions)
Undoes pause
.
Ex. ;; resumes everything that has been paused
(resume consumer)
;; resumes only specific [topic partition]'s
(resume [["my-topic" 0]
["another-topic" 3]])
Undoes `pause`. Ex. ;; resumes everything that has been paused (resume consumer) ;; resumes only specific [topic partition]'s (resume [["my-topic" 0] ["another-topic" 3]])
(rewind consumer)
(rewind consumer topic-partitions)
Rewinds a consumer to the first available offset for the given [topic partition]'s.
Actually happens lazily on the next call to poll
or next-offset
.
<!> Has no effect on mock consumers. <!>
Ex. ;; rewind all currently registered [topic partition]'s
(rewind consumer)
;; rewind only specific currently registered [topic partition]'s
(rewind consumer
[["my-topic" 0]
["another-topic" 3]])
Rewinds a consumer to the first available offset for the given [topic partition]'s. Actually happens lazily on the next call to `poll` or `next-offset`. <!> Has no effect on mock consumers. <!> Ex. ;; rewind all currently registered [topic partition]'s (rewind consumer) ;; rewind only specific currently registered [topic partition]'s (rewind consumer [["my-topic" 0] ["another-topic" 3]])
(seek consumer topic-partition->offset)
Moves the consumer to the new offsets.
Actually happens lazily on the next call to poll
and variations.
Ex. (seek consumer {["my-topic" 0] 42 ["another-topic" 3] 84})
Moves the consumer to the new offsets. Actually happens lazily on the next call to `poll` and variations. Ex. (seek consumer {["my-topic" 0] 42 ["another-topic" 3] 84})
(topics consumer)
(topics consumer options)
Requests a list of metadata about partitions for all topics the consumer is authorized to consume.
Returns a map of topic -> list of metadata exactly like in partitions
.
A map of options may be given :
:dvlopt.kafka/timeout
Cf. dvlopt.kafka
for description of time intervals
Requests a list of metadata about partitions for all topics the consumer is authorized to consume. Returns a map of topic -> list of metadata exactly like in `partitions`. A map of options may be given : :dvlopt.kafka/timeout Cf. `dvlopt.kafka` for description of time intervals
(unregister consumer)
Unregisters a consumer from all the sources it is consuming.
Unregisters a consumer from all the sources it is consuming.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close