Liking cljdoc? Tell your friends :D

dvlopt.kafka.in

Kafka consumers.

Kafka consumers.
raw docstring

beginning-offsetsclj

(beginning-offsets consumer topic-partitions)
(beginning-offsets consumer topic-partitions options)

Requests a map of [topic partition] -> first available offset for consumption.

This function does not change the current position of the consumer.

<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.

A map of options may be given :

:dvlopt.kafka/timeout Cf. dvlopt.kafka for description of time intervals

Ex. (beginning-offsets consumer [["my-topic" 0] ["another-topic" 3]] {:dvlopt.kafka/timeout [5 :seconds]})

Requests a map of [topic partition] -> first available offset for consumption.

This function does not change the current position of the consumer.

<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.


A map of options may be given :

   :dvlopt.kafka/timeout
   Cf. `dvlopt.kafka` for description of time intervals


Ex. (beginning-offsets consumer
                       [["my-topic"      0]
                        ["another-topic" 3]]
                       {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

closeclj

(close consumer)
(close consumer options)

Closes the consumer and releases all associated resources.

A map of options may be given :

:dvlopt.kafka/timeout Default one is of 30 seconds. The consumer will try to complete pending commits and leave its consumer group. If auto-commit is enabled, the current offsets will be committed. If operations are not completed before the timeout, the consumer will be force closed. Cf. dvlopt.kafka for description of time intervals

Closes the consumer and releases all associated resources.

A map of options may be given :

  :dvlopt.kafka/timeout
   Default one is of 30 seconds.
   The consumer will try to complete pending commits and leave its consumer group. If auto-commit
   is enabled, the current offsets will be committed. If operations are not completed before the
   timeout, the consumer will be force closed.
   Cf. `dvlopt.kafka` for description of time intervals
sourceraw docstring

commit-offsetsclj

(commit-offsets consumer)
(commit-offsets consumer options)

Manually and synchronously commits offsets to Kafka.

The committed offsets will be used on the first fetch after every assignment and also on startup. As such, if the user need to store offsets anywhere else, this function should not be used.

A map of options may be given :

:dvlopt.kafka/timeout Cf. dvlopt.kafka for description of time intervals

::topic-partition->offset Map of [topic partition] -> offset to commit. If none is provided, offsets from the last call to poll and variations are used.

Ex. ;; commits specific offsets

(commit-offsets consumer
                {:dvlopt.kafka/timeout     [5 :seconds]
                 ::topic-partition->offset {["my-topic"      0] 24
                                            ["another-topic" 3] 84})
Manually and synchronously commits offsets to Kafka.

The committed offsets will be used on the first fetch after every assignment and also on startup.
As such, if the user need to store offsets anywhere else, this function should not be used.


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. `dvlopt.kafka` for description of time intervals

  ::topic-partition->offset
   Map of [topic partition] -> offset to commit. If none is provided, offsets from the last call to `poll` and variations
   are used.


Ex. ;; commits specific offsets

    (commit-offsets consumer
                    {:dvlopt.kafka/timeout     [5 :seconds]
                     ::topic-partition->offset {["my-topic"      0] 24
                                                ["another-topic" 3] 84})
sourceraw docstring

commit-offsets-asyncclj

(commit-offsets-async consumer)
(commit-offsets-async consumer options)

Manually and asynchronously commits offsets to Kafka.

The end result is the same as commit-offsets-sync but the offsets will be committed on the next trip to the server, such as calling poll.

Multiple calls to this function are garanteed to be sent in order and any pending async commit will happen before a new sync one.

A map of options may be given :

::on-committed Callback called when offsets are actually committed. Acceps 2 arguments : an exception in case of failure and a map of [topic partition] -> committed offset in case of success.

::topic-partition->offset Map of [topic partition] -> offset to commit. If none is provided, offsets from the last call to poll and variations are used.

Ex. (commit-offsets-async consumer {::on-committed (fn callback [exception topic-partition->offset] (when exception ...)) ::topic-partition->offset {["some_topic" 0]} 42})

Manually and asynchronously commits offsets to Kafka.

The end result is the same as `commit-offsets-sync` but the offsets will be committed on the next
trip to the server, such as calling `poll`.

Multiple calls to this function are garanteed to be sent in order and any pending async commit will happen before a new sync one.


A map of options may be given :

  ::on-committed
   Callback called when offsets are actually committed. Acceps 2 arguments : an exception in case of failure and a map of
   [topic partition] -> committed offset in case of success.

  ::topic-partition->offset
   Map of [topic partition] -> offset to commit. If none is provided, offsets from the last call to `poll` and variations
   are used.


Ex. (commit-offsets-async consumer
                          {::on-committed            (fn callback [exception topic-partition->offset]
                                                       (when exception
                                                         ...))
                           ::topic-partition->offset {["some_topic" 0]} 42})
sourceraw docstring

committed-offsetclj

(committed-offset consumer [topic partition :as topic-partition])
(committed-offset consumer [topic partition] options)

Requests the last committed offset for a [topic partition] (nil if nothing has been committed).

May block if the partition is not assigned to this consumer or if the consumer hasn't yet initialized its cache of committed offsets.

A map of options may be given :

:dvlopt.kafka/timeout Cf. dvlopt.kafka for description of time intervals

Requests the last committed offset for a [topic partition] (nil if nothing has been committed).

May block if the partition is not assigned to this consumer or if the consumer hasn't yet initialized its cache of committed
offsets.

A map of options may be given :

   :dvlopt.kafka/timeout
   Cf. `dvlopt.kafka` for description of time intervals
sourceraw docstring

consumerclj

(consumer)
(consumer options)

Builds a Kafka consumer.

<!> Consumers are NOT thread safe ! 1 consumer / thread or a queueing policy must be implemented.

A map of options may be given :

::configuration Map of Kafka consumer properties. Cf. https://kafka.apache.org/documentation/#newconsumerconfigs

:dvlopt.kafka/nodes List of [host port].

:dvlopt.kafka/deserializer.key Prepared deserialize or a function coercing a byte-array to a key (cf. deserializer).

:dvlopt.kafka/deserializer.value Idem, but for values.

Ex. (consumer {::configuration {"group.id" "my-group" "enable.auto.commit" false} :dvlopt.kafka/nodes [["some_host"] 9092] :dvlopt.kafka/deserializer.key :string :dvlopt.kafka/deserializer.value (fn [data _metadata] (some-> data nippy/thaw))})

Builds a Kafka consumer.

<!> Consumers are NOT thread safe !
    1 consumer / thread or a queueing policy must be implemented.

A map of options may be given :

  ::configuration
    Map of Kafka consumer properties.
    Cf. https://kafka.apache.org/documentation/#newconsumerconfigs

  :dvlopt.kafka/nodes
    List of [host port].

  :dvlopt.kafka/deserializer.key
    Prepared deserialize or a function coercing a byte-array to a key (cf. `deserializer`).

  :dvlopt.kafka/deserializer.value
    Idem, but for values.


Ex. (consumer {::configuration                  {"group.id"           "my-group"
                                                 "enable.auto.commit" false}
               :dvlopt.kafka/nodes              [["some_host"] 9092]
               :dvlopt.kafka/deserializer.key   :string
               :dvlopt.kafka/deserializer.value (fn [data _metadata]
                                                  (some-> data
                                                          nippy/thaw))})
sourceraw docstring

end-offsetsclj

(end-offsets consumer topic-partitions)
(end-offsets consumer topic-partitions options)

Requests a map of [topic partition] -> end offset (ie. offset of the last message + 1).

Works like beginning-offsets.

Requests a map of [topic partition] -> end offset (ie. offset of the last message + 1).

Works like `beginning-offsets`.
sourceraw docstring

fast-forwardclj

(fast-forward consumer)
(fast-forward consumer topic-partitions)

Fast forwards a consumer to the end of the given [topic partition]'s.

Actually happens lazily on the next call to poll or next-offset.

If no [topic partition] is supplied, applies to all currently assigned partitions.

If the consumer was configured with :isolation.level = "read_committed", the latest offsets will be the "last stable" ones.

Ex. ;; fast-forward all currently registered [topic partition]'s

(fast-forward consumer)

;; fast-forward specific currently registered [topic partition]'s

(fast-forward consumer
              [["my-topic"      0]
               ["another-topic" 3]])
Fast forwards a consumer to the end of the given [topic partition]'s.

Actually happens lazily on the next call to `poll` or `next-offset`.

If no [topic partition] is supplied, applies to all currently assigned partitions.

If the consumer was configured with :isolation.level = "read_committed", the latest offsets will be
the "last stable" ones.


Ex. ;; fast-forward all currently registered [topic partition]'s

    (fast-forward consumer)

    ;; fast-forward specific currently registered [topic partition]'s

    (fast-forward consumer
                  [["my-topic"      0]
                   ["another-topic" 3]])
sourceraw docstring

metricsclj

(metrics consumer)

Requests metrics about this consumer, exactly like dvlopt.kafka.out/metrics.

Requests metrics about this consumer, exactly like `dvlopt.kafka.out/metrics`.
sourceraw docstring

next-offsetclj

(next-offset consumer [topic partition :as topic-partition])
(next-offset consumer [topic partition] options)

Given a [topic partition], requests the offset of the next record this consumer can consume.

Issues a remote call only if there is no current position for the requested [topic partition].

A map of options may be given :

:dvlopt.kafka/timeout Cf. dvlopt.kafka for description of time intervals

Ex. (next-offset consumer ["my-topic" 0] {:dvlopt.kafka/timeout [5 :seconds]})

Given a [topic partition], requests the offset of the next record this consumer can consume.

Issues a remote call only if there is no current position for the requested [topic partition].


A map of options may be given :

   :dvlopt.kafka/timeout
   Cf. `dvlopt.kafka` for description of time intervals


Ex. (next-offset consumer
                 ["my-topic" 0]
                 {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

offsets-for-timestampsclj

(offsets-for-timestamps consumer topic-partition->timestamp)
(offsets-for-timestamps consumer topic-partition->timestamp options)

Requests a map of [topic partition] -> map containing :

:dvlopt.kafka/offset Earliest offset whose timestamp is greather than or equal to the given one for that [topic partition].

:dvlopt.kafka/timestamp Timestamp of the record at that offset.

<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.

A map of options may be given :

:dvlopt.kafka/timeout Cf. dvlopt.kafka for description of time intervals

Ex. (offsets-for-timestamps consumer {["my-topic" 0] 1507812268270 ["another-topic" 3] 1507812338294} {:dvlopt.kafka/timeout [5 :seconds]})

Requests a map of [topic partition] -> map containing :

  :dvlopt.kafka/offset 
   Earliest offset whose timestamp is greather than or equal to the given one for that [topic partition].

  :dvlopt.kafka/timestamp
   Timestamp of the record at that offset.


<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.


A map of options may be given :

   :dvlopt.kafka/timeout
   Cf. `dvlopt.kafka` for description of time intervals


Ex. (offsets-for-timestamps consumer
                            {["my-topic"      0] 1507812268270
                             ["another-topic" 3] 1507812338294}
                            {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

partitionsclj

(partitions consumer topic)
(partitions consumer topic options)

Requests a list of partitions for a given topic.

For the returned value, cf. dvlopt.kafka.out/partitions.

A map of options may be given :

:dvlopt.kafka/timeout Cf. dvlopt.kafka for description of time intervals

Requests a list of partitions for a given topic.

For the returned value, cf. `dvlopt.kafka.out/partitions`.

A map of options may be given :

   :dvlopt.kafka/timeout
   Cf. `dvlopt.kafka` for description of time intervals
sourceraw docstring

pauseclj

(pause consumer topic-partitions)

Suspends fetching records from the given [topic partition]'s until resume is called.

This function does not affect subscriptions nor does it trigger a consumer group rebalance.

Ex. (pause consumer [["my-topic" 0] ["another-topic" 3]])

Suspends fetching records from the given [topic partition]'s until `resume` is called.

This function does not affect subscriptions nor does it trigger a consumer group rebalance.


Ex. (pause consumer
           [["my-topic"      0]
            ["another-topic" 3]])
sourceraw docstring

pausedclj

(paused consumer)

Returns a set of the currently paused [topic partition]'s.

Returns a set of the currently paused [topic partition]'s.
sourceraw docstring

pollclj

(poll consumer)
(poll consumer options)

Synchronously polls records from the registered sources.

A map of options may be given :

:dvlopt.kafka/timeout An emtpy interval such as [0 :milliseconds] returns what is available in the consumer buffer without blocking. Cf. dvlopt.kafka for description of time intervals

A record is a map containing :

:dvlopt.kafka/key Deserialized key.

:dvlopt.kafka/offset Record offset.

:dvlopt.kafka/partition Partition number.

:dvlopt.kafka/timestamp Unix timestamp.

:dvlopt.kafka/topic Topic name.

:dvlopt.kafka/value Deserialized value.

Synchronously polls records from the registered sources.


A map of options may be given :

   :dvlopt.kafka/timeout
   An emtpy interval such as [0 :milliseconds] returns what is available in the consumer buffer without blocking.
   Cf. `dvlopt.kafka` for description of time intervals


A record is a map containing :
 
  :dvlopt.kafka/key
   Deserialized key.

  :dvlopt.kafka/offset
   Record offset.

  :dvlopt.kafka/partition
   Partition number.

  :dvlopt.kafka/timestamp
   Unix timestamp.

  :dvlopt.kafka/topic
   Topic name.

  :dvlopt.kafka/value
   Deserialized value.
sourceraw docstring

poll-seqclj

(poll-seq consumer)
(poll-seq consumer options)

Returns a sequence of records obtained by lazily and continuously calling poll under the hood.

Ex. (take 5 (poll-seq consumer {:dvlopt.kafka/timeout [5 :seconds]}))

Returns a sequence of records obtained by lazily and continuously calling `poll` under the hood.


Ex. (take 5
          (poll-seq consumer
                    {:dvlopt.kafka/timeout [5 :seconds]}))
sourceraw docstring

poll-topic-partitionsclj

(poll-topic-partitions consumer)
(poll-topic-partitions consumer options)

Behaves exactly like poll but returns a map of [topic partition] -> list of record.

More efficient when records are dispatched to workers by [topic partition].

Behaves exactly like `poll` but returns a map of [topic partition] -> list of record.

More efficient when records are dispatched to workers by [topic partition].
sourceraw docstring

register-forclj

(register-for consumer source)
(register-for consumer source options)

A consumer can either be assigned to a list of specific [topic partition]'s by the user or subscribe to a list of topics or a regular expression describing topics. When subscribing, the [topic partition]'s are dynamically assigned by the broker. Assignments change over time. For instance, when another consumer from the same consumer group subscribes to the same topics, [topic partition]'s are rebalanced between consumers so that one of them does not overwork or underwork.

Everytime this function is called, the consumer is unregistered from previous sources if there are any.

A map of options may be given :

::on-rebalance Callback in order to be informed in case of dynamic :assignment or :revocation (first argument) regarding [topic partition]'s (second argument).

Ex. ;; dynamic subscription by regular expression

(register-for consumer
              #"topic-.+"
              {::on-rebalance (fn [operation topic-partitions]
                                (when (= operation
                                         :assignment)
                                  ...))})

;; dynamic subscription to given topics

(register-for consumer
              ["my-topic"
               "another-topic"])

;; static assignment to specific [topic partition]'s

(register-for consumer
              [["my-topic"      0]
               ["another-topic" 3]])
A consumer can either be assigned to a list of specific [topic partition]'s by the user or subscribe
to a list of topics or a regular expression describing topics. When subscribing, the [topic partition]'s
are dynamically assigned by the broker. Assignments change over time. For instance, when another consumer
from the same consumer group subscribes to the same topics, [topic partition]'s are rebalanced between
consumers so that one of them does not overwork or underwork.

Everytime this function is called, the consumer is unregistered from previous sources if there are any.

A map of options may be given :

  ::on-rebalance
   Callback in order to be informed in case of dynamic :assignment or :revocation (first argument) regarding
   [topic partition]'s (second argument).


Ex. ;; dynamic subscription by regular expression

    (register-for consumer
                  #"topic-.+"
                  {::on-rebalance (fn [operation topic-partitions]
                                    (when (= operation
                                             :assignment)
                                      ...))})

    ;; dynamic subscription to given topics

    (register-for consumer
                  ["my-topic"
                   "another-topic"])

    ;; static assignment to specific [topic partition]'s

    (register-for consumer
                  [["my-topic"      0]
                   ["another-topic" 3]])
sourceraw docstring

registered-forclj

(registered-for consumer)

Returns a map containing :

::assignments Set of all [topic partition]'s the consumer is currently assigned to, either by the broker or by the user.

::subscriptions Set of all topics the consumer is subscribed to, if any.

Returns a map containing :

::assignments
 Set of all [topic partition]'s the consumer is currently assigned to, either by the broker or by the user.

::subscriptions
 Set of all topics the consumer is subscribed to, if any.
sourceraw docstring

registered-for?clj

(registered-for? consumer source)

Is the consumer listening to the given topic / [topic partition] ?

Is the consumer listening to the given topic / [topic partition] ?
sourceraw docstring

resumeclj

(resume consumer)
(resume consumer topic-partitions)

Undoes pause.

Ex. ;; resumes everything that has been paused

(resume consumer)

;; resumes only specific [topic partition]'s

(resume [["my-topic"      0]
         ["another-topic" 3]])
Undoes `pause`.


Ex. ;; resumes everything that has been paused

    (resume consumer)

    ;; resumes only specific [topic partition]'s
 
    (resume [["my-topic"      0]
             ["another-topic" 3]])
sourceraw docstring

rewindclj

(rewind consumer)
(rewind consumer topic-partitions)

Rewinds a consumer to the first available offset for the given [topic partition]'s.

Actually happens lazily on the next call to poll or next-offset.

Ex. ;; rewind all currently registered [topic partition]'s

(rewind consumer)

;; rewind only specific currently registered [topic partition]'s

(rewind consumer
        [["my-topic"      0]
         ["another-topic" 3]])
Rewinds a consumer to the first available offset for the given [topic partition]'s.

Actually happens lazily on the next call to `poll` or `next-offset`.


Ex. ;; rewind all currently registered [topic partition]'s
    
    (rewind consumer)

    ;; rewind only specific currently registered [topic partition]'s

    (rewind consumer
            [["my-topic"      0]
             ["another-topic" 3]])
sourceraw docstring

seekclj

(seek consumer topic-partition->offset)

Moves the consumer to the new offsets.

Actually happens lazily on the next call to poll and variations.

Ex. (seek consumer {["my-topic" 0] 42 ["another-topic" 3] 84})

Moves the consumer to the new offsets.

Actually happens lazily on the next call to `poll` and variations.


Ex. (seek consumer
          {["my-topic"      0] 42
           ["another-topic" 3] 84})
sourceraw docstring

topicsclj

(topics consumer)
(topics consumer options)

Requests a list of metadata about partitions for all topics the consumer is authorized to consume.

Returns a map of topic -> list of metadata exactly like in partitions.

A map of options may be given :

:dvlopt.kafka/timeout Cf. dvlopt.kafka for description of time intervals

Requests a list of metadata about partitions for all topics the consumer is authorized to consume.

Returns a map of topic -> list of metadata exactly like in `partitions`.

A map of options may be given :

   :dvlopt.kafka/timeout
   Cf. `dvlopt.kafka` for description of time intervals
sourceraw docstring

unregisterclj

(unregister consumer)

Unregisters a consumer from all the sources it is consuming.

Unregisters a consumer from all the sources it is consuming.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close