Liking cljdoc? Tell your friends :D

milena.consume

Everything related to Kafka consumers.

Everything related to Kafka consumers.
raw docstring

closeclj

(close consumer)
(close consumer timeout-ms)

Tries to close the consumer cleanly within the given timeout or a default one of 30 seconds.

It will try to complete pending commits and leave the group. If auto-commit is enabled, the current offsets will be committed. If those operations aren't completed when the timeout is reached, the consumer will be force closed.

Note that unblock cannot be used to interrupt this fn.

@ consumer Kafka consumer.

@ timeout-ms Timeout in milliseconds.

=> nil

Throws

org.apache.kafka.common.errors

InterruptException
  When the thread is interrupted while blocked.
Tries to close the consumer cleanly within the given timeout or a default one of 30 seconds.

It will try to complete pending commits and leave the group. If auto-commit is enabled,
the current offsets will be committed. If those operations aren't completed when the timeout
is reached, the consumer will be force closed.

Note that `unblock` cannot be used to interrupt this fn.


@ consumer
  Kafka consumer.

@ timeout-ms
  Timeout in milliseconds.

=> nil


Throws

  org.apache.kafka.common.errors

    InterruptException
      When the thread is interrupted while blocked.
raw docstring

commit-asyncclj

(commit-async consumer)
(commit-async consumer callback)
(commit-async consumer callback offsets)

Asynchronously commits offsets to Kafka.

If none are given, commits offsets from the last call to poll.

The committed offsets will be used on the first fetch after every assignment and also on startup. As such, if the user need to store offsets anywhere else, this fn should not be used.

Actually commits on the next trip to the server, such as calling poll.

The callback must accept as arguments a possible exception and a map of [topic partition] -> committed offset.

@ consumer Kafka consumer.

@ callback (nilable) Cf.

=> consumer

Ex. (commit-sync consumer)

(commit-sync consumer
             (fn [exception offsets]
                ...))

(commit-sync consumer
             (fn [exception offsets]
               (when-not exception
                 (println "Should be true :"
                          (= (get offsets
                                  ["my-topic" 0])
                             24))))
             {["my-topic"      0] 24
              ["another-topic" 3] 84})

Throws

Cf. commit-sync for exceptions

Asynchronously commits offsets to Kafka.

If none are given, commits offsets from the last call to `poll`.

The committed offsets will be used on the first fetch after every assignment and also on startup.
As such, if the user need to store offsets anywhere else, this fn should not be used.

Actually commits on the next trip to the server, such as calling `poll`.

The callback must accept as arguments a possible exception and a map of [topic partition] -> committed offset.


@ consumer
  Kafka consumer.

@ callback (nilable)
  Cf.

=> `consumer`


Ex. (commit-sync consumer)

    (commit-sync consumer
                 (fn [exception offsets]
                    ...))

    (commit-sync consumer
                 (fn [exception offsets]
                   (when-not exception
                     (println "Should be true :"
                              (= (get offsets
                                      ["my-topic" 0])
                                 24))))
                 {["my-topic"      0] 24
                  ["another-topic" 3] 84})


Throws

  Cf. `commit-sync` for exceptions
raw docstring

commit-syncclj

(commit-sync consumer)
(commit-sync consumer offsets)

Synchronously commits offsets to Kafka.

If none are given, commits offsets from the last call to poll.

The committed offsets will be used on the first fetch after every assignment and also on startup. As such, if the user need to store offsets anywhere else, this fn should not be used.

@ consumer Kafka consumer.

@ offsets Map of [topic partition] to offsets.

=> consumer

Ex. (commit-sync consumer)

(commit-sync consumer
             {["my-topic"      0] 24
              ["another-topic" 3] 84})

Throws

org.apache.kafka.clients.consumer

CommitFailedException
  When the commit failed and cannot be retried (only occurs when using subscriptions or if there is an active
  groupe with the same groupId).

org.apache.kafka.common.errors

WakeupException
  When `unblock` is called before or while this fn is called.

InterruptException
  When the calling thread is interrupted.

AuthorizationException
  When not authorized to the specified topic.

KafkaException
  Any other unrecoverable errors.
Synchronously commits offsets to Kafka.

If none are given, commits offsets from the last call to `poll`.

The committed offsets will be used on the first fetch after every assignment and also on startup.
As such, if the user need to store offsets anywhere else, this fn should not be used.


@ consumer
  Kafka consumer.

@ offsets
  Map of [topic partition] to offsets.

=> `consumer`


Ex. (commit-sync consumer)

    (commit-sync consumer
                 {["my-topic"      0] 24
                  ["another-topic" 3] 84})


Throws

  org.apache.kafka.clients.consumer

    CommitFailedException
      When the commit failed and cannot be retried (only occurs when using subscriptions or if there is an active
      groupe with the same groupId).

  org.apache.kafka.common.errors

    WakeupException
      When `unblock` is called before or while this fn is called.

    InterruptException
      When the calling thread is interrupted.

    AuthorizationException
      When not authorized to the specified topic.

    KafkaException
      Any other unrecoverable errors.
raw docstring

committedclj

(committed consumer topic-partitions)
(committed consumer topic partition)

Gets the last committed offset for one or several partitions.

May block if the partition is not assigned to this consumer or if the consumer hasn't yet initialized its cache of committed offsets.

@ consumer Kafka consumer.


@ topic-partitions List of [topic partition]. Cf. milena.interop.java/topic-partition

=> Map of [topic partition] to offsets.


@ topic Topic name.

@ partition Partition number.

=> Offset.

Ex. (committed consumer "my-topic" 0) => 42

(committed consumer
           [["my-topic"      0]
            ["another-topic" 3]])
=> {["my-topic"      0] 42
    ["another-topic" 3] 84}
Gets the last committed offset for one or several partitions.

May block if the partition is not assigned to this consumer or if the consumer hasn't yet initialized its cache of committed
offsets.


@ consumer
  Kafka consumer.

---

@ topic-partitions
  List of [topic partition].
  Cf. `milena.interop.java/topic-partition`

=> Map of [topic partition] to offsets.

---

@ topic
  Topic name.

@ partition
  Partition number.

=> Offset.


Ex. (committed consumer
               "my-topic"
               0)
    => 42

    (committed consumer
               [["my-topic"      0]
                ["another-topic" 3]])
    => {["my-topic"      0] 42
        ["another-topic" 3] 84}
raw docstring

consumer?clj

(consumer? x)

Is x a consumer ?

Is `x` a consumer ?
raw docstring

fast-forwardclj

(fast-forward consumer)
(fast-forward consumer topic-partitions)
(fast-forward consumer topic partition)

Fast forwards a consumer to the latest offset (ie. offset of the last message + 1) for one or several partitions.

Happens lazily on the next call to poll or offs-current.

If no partition is given, applies to all currently assigned partitions.

@ consumer Kafka consumer.


@ topic-partitions List of [topic partition]. Cf. milena.interop.java/topic-partition


@ topic Topic name.

@ partition Partition number.


=> consumer

Ex. (fast-forward consumer "my-topic" 0)

(fast-forward consumer
              [["my-topic"      0]
               ["another-topic" 3]])
Fast forwards a consumer to the latest offset (ie. offset of the last message + 1) for one or
several partitions.

Happens lazily on the next call to `poll` or `offs-current`.

If no partition is given, applies to all currently assigned partitions.


@ consumer
  Kafka consumer.

---

@ topic-partitions
  List of [topic partition].
  Cf. `milena.interop.java/topic-partition`

---

@ topic
  Topic name.

@ partition
  Partition number.

---

=> `consumer`


Ex. (fast-forward consumer
                  "my-topic"
                  0)

    (fast-forward consumer
                  [["my-topic"      0]
                   ["another-topic" 3]])
raw docstring

listenclj

(listen consumer source)
(listen consumer source f-rebalance)

Subscribes a consumer to topics or assigns it to specific partitions.

An assignement precisely refers to a [topic partition] whereas a subscription only ask for a topic, the partition being assigned dynamically.

<!> A consumer can only consume from one type of source. This fn will throw if the user try to mix, for instance, subscriptions and assignments, or regexes and strings.

@ consumer Kafka consumer.

@ source One of : regular expression representing topics to subscribe to | list of topics as strings to subscribe to | list of [topic partition] to be assigned to | nil

@ f-rebalance Optional fn for subscriptions. Cf. milena.interop.java/consumer-rebalance-listener

Ex. (listen consumer #"topic-.+" (fn [assigned? topic-partitions] (when assigned? ...)))

(listen consumer
        ["my-topic"
         "another-topic"])

(listen consumer
        [["my-topic"      0]
         ["another-topic" 3]])

(listen consumer
        nil)
Subscribes a consumer to topics or assigns it to specific partitions.

An assignement precisely refers to a [topic partition] whereas a subscription only ask for a topic,
the partition being assigned dynamically.

<!> A consumer can only consume from one type of source. This fn will throw if
    the user try to mix, for instance, subscriptions and assignments, or regexes
    and strings.


@ consumer
  Kafka consumer.

@ source
  One of : regular expression representing topics to subscribe to
         | list of topics as strings to subscribe to
         | list of [topic partition] to be assigned to
         | nil

@ f-rebalance
  Optional fn for subscriptions.
  Cf. `milena.interop.java/consumer-rebalance-listener`



Ex. (listen consumer
            #"topic-.+"
            (fn [assigned? topic-partitions]
              (when assigned?
                ...)))

    (listen consumer
            ["my-topic"
             "another-topic"])

    (listen consumer
            [["my-topic"      0]
             ["another-topic" 3]])

    (listen consumer
            nil)
raw docstring

listeningclj

(listening consumer)

Gets all the partitions the consumer is assigned to or its subscriptions.

@ consumer Kafka consumer.

=> {:partitions Set of topic-partitions. Cf. milena.interop.clj/topic-partition

:subscriptions
 Set of topic names.}

Cf. listen

Gets all the partitions the consumer is assigned to or its subscriptions.


@ consumer
  Kafka consumer.

=> {:partitions
     Set of topic-partitions.
     Cf. `milena.interop.clj/topic-partition`

    :subscriptions
     Set of topic names.}


Cf. `listen`
raw docstring

listening?clj

(listening? consumer source)

Is the consumer listening to the given topic / [topic partition] ?

Is the consumer listening to the given topic / [topic partition] ?
raw docstring

makeclj

(make)
(make {:as opts
       :keys [nodes config deserializer deserializer-key deserializer-value]
       :or {nodes [["localhost" 9092]]
            deserializer M.deserialize/byte-array
            deserializer-key deserializer
            deserializer-value deserializer}})

Builds a Kafka consumer.

<!> Consumers are NOT thread safe ! 1 consumer / thread or a queueing policy must be implemented.

@ opts (nilable) {:nodes (nilable) List of [host port].

:config (nilable) Kafka configuration map. Cf. https://kafka.apache.org/documentation/#newconsumerconfigs

:deserializer (nilable) Kafka deserializer or fn eligable for becoming one. Cf. milena.deserialize milena.deserialize/make

:deserializer-key (nilable) Defaulting to ?deserializer.

:deserializer-value (nilable) Defaulting to ?deserializer.}

=> org.apache.kafka.clients.consumer.KafkaConsumer

Ex. (make {:nodes [["some_host" 9092]] :config {:group.id "my_group" :enable.auto.commit false} :deserializer-key milena.deserialize/string :deserializer-value (fn [_ data] (nippy/thaw data)) :listen [["my-topic" 3]]})

Builds a Kafka consumer.

<!> Consumers are NOT thread safe !
    1 consumer / thread or a queueing policy must be implemented.


@ opts (nilable)
  {:nodes (nilable)
    List of [host port].

   :config (nilable)
    Kafka configuration map.
    Cf. https://kafka.apache.org/documentation/#newconsumerconfigs

   :deserializer (nilable)
    Kafka deserializer or fn eligable for becoming one.
    Cf. `milena.deserialize`
        `milena.deserialize/make`

   :deserializer-key (nilable)
    Defaulting to `?deserializer`.

   :deserializer-value (nilable)
    Defaulting to `?deserializer`.}

=> org.apache.kafka.clients.consumer.KafkaConsumer


Ex. (make {:nodes              [["some_host" 9092]]
           :config             {:group.id           "my_group"
                                :enable.auto.commit false}
           :deserializer-key   milena.deserialize/string
           :deserializer-value (fn [_ data]
                                 (nippy/thaw data))
           :listen             [["my-topic" 3]]})
raw docstring

metricsclj

(metrics consumer)

Gets metrics about this consumer.

@ consumer Kafka consumer.

=> Cf. milena.interop.clj/metrics

Gets metrics about this consumer.

@ consumer
  Kafka consumer.

=> Cf. `milena.interop.clj/metrics`
raw docstring

offs-by-tsclj

(offs-by-ts consumer timestamps)
(offs-by-ts consumer topic partition ts)

Finds offsets for the given partition(s) by timestamp, ie. the earliest offsets whose timestamp is greater than or equal to the corresponding ones.

<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.

@ consumer Kafka consumer.


@ topic-partitions List of [topic partition]. Cf. milena.interop.java/topic-partition

=> Map of [topic partition] to results.

  • results {:timestamp Unix timestamp of the record.

    :offset Offset in the partition.}


@ topic Topic name.

@ partition Partition number.

=> Offset.

Ex. (offs-by-ts consumer "my-topic" 0 1507812268270) => 42

(offs-by-ts consumer
            {["my-topic"      0] 1507812268270
             ["another-topic" 3] 1507812338294})
=> {["my-topic"      0] {:timestamp 1507812268270
                           :offset    42
    ["another-topic" 3] {:timestamp 1507812338294
                           :offset    84}
Finds offsets for the given partition(s) by timestamp, ie. the earliest offsets whose timestamp
is greater than or equal to the corresponding ones.

<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.


@ consumer
  Kafka consumer.

---

@ topic-partitions
  List of [topic partition].
  Cf. `milena.interop.java/topic-partition`

=> Map of [topic partition] to results.

   + results
     {:timestamp
       Unix timestamp of the record.

      :offset
       Offset in the partition.}

---

@ topic
  Topic name.

@ partition
  Partition number.

=> Offset.


Ex. (offs-by-ts consumer
                "my-topic"
                0
                1507812268270)
    => 42

    (offs-by-ts consumer
                {["my-topic"      0] 1507812268270
                 ["another-topic" 3] 1507812338294})
    => {["my-topic"      0] {:timestamp 1507812268270
                               :offset    42
        ["another-topic" 3] {:timestamp 1507812338294
                               :offset    84}
raw docstring

offs-currentclj

(offs-current consumer topic-partitions)
(offs-current consumer topic partition)

Gets the current offset of a consumer on one or several partitions (ie. the offset of the next record).

@ consumer Kafka consumer.


@ topic-partitions List of [topic partition]. Cf. milena.interop.java/topic-partition

=> Map of [topic partition] to offsets.


@ topic Topic name.

@ partition Partition number.

=> Position.

Ex. (offs-current consumer "my-topic" 0) => 42

(offs-current consumer
              [["my-topic"      0]
               ["another-topic" 3]])
=> {["my-topic"      0] 42
    ["another-topic" 3] 84}

Throws

org.apache.kafka.common.errors

WakeupException
  When `unblock` is called before or while this fn is called.

InterruptException
  When the calling thread is interrupted.

AuthorizationException
  When not authorized to the specified topic.

TimeoutException
  When the topic metadata could not be fetched before expiration of the configured request timeout.

KafkaException
  Any other unrecoverable errors.
Gets the current offset of a consumer on one or several partitions (ie. the offset of the next record).


@ consumer
  Kafka consumer.

---

@ topic-partitions
  List of [topic partition].
  Cf. `milena.interop.java/topic-partition`

=> Map of [topic partition] to offsets. 

---

@ topic
  Topic name.

@ partition
  Partition number.

=> Position. 


Ex. (offs-current consumer
                  "my-topic"
                  0)
    => 42

    (offs-current consumer
                  [["my-topic"      0]
                   ["another-topic" 3]])
    => {["my-topic"      0] 42
        ["another-topic" 3] 84}


Throws

  org.apache.kafka.common.errors

    WakeupException
      When `unblock` is called before or while this fn is called.

    InterruptException
      When the calling thread is interrupted.

    AuthorizationException
      When not authorized to the specified topic.

    TimeoutException
      When the topic metadata could not be fetched before expiration of the configured request timeout.

    KafkaException
      Any other unrecoverable errors.
raw docstring

offs-firstclj

(offs-first consumer topic-partitions)
(offs-first consumer topic partition)

Finds the first available offset of the given topic-partition(s).

<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.

@ consumer Kafka consumer.


@ topic-partitions List of [topic partition]. Cf. milena.interop.java/topic-partition

=> Map of [topic partition] to offset.


@ topic Topic name.

@ partition Partition number.

=> Offset.

Ex. (offs-first consumer "my-topic" 0) => 421

(offs-first consumer
            [["my-topic"      0]
             ["another-topic" 3]])
=> {["my-topic"      0] 421
    ["another-topic" 3] 842}
Finds the first available offset of the given topic-partition(s).

<!> Blocks forever if the topic doesn't exist and dynamic creation has been disabled server-side.


@ consumer
  Kafka consumer.

---

@ topic-partitions
  List of [topic partition].
  Cf. `milena.interop.java/topic-partition`

=> Map of [topic partition] to offset.

---

@ topic
  Topic name.

@ partition
  Partition number.

=> Offset.


Ex. (offs-first consumer
                "my-topic"
                0)
    => 421

    (offs-first consumer
                [["my-topic"      0]
                 ["another-topic" 3]])
    => {["my-topic"      0] 421
        ["another-topic" 3] 842}
raw docstring

offs-latestclj

(offs-latest consumer topic-partitions)
(offs-latest consumer topic partition)

Finds the latest offset of the given partition(s), ie. the offset of the last message + 1.

@ consumer Kafka consumer.


@ topic-partitions List of [topic partition]. Cf. milena.interop.java/topic-partition

=> Map of [topic partition] to offset.


@ topic Topic name.

@ partition Partition number.

=> Offset.

Cf. offs-first

Finds the latest offset of the given partition(s), ie. the offset of the last message + 1.

@ consumer
  Kafka consumer.

---

@ topic-partitions
  List of [topic partition].
  Cf. `milena.interop.java/topic-partition`

=> Map of [topic partition] to offset.

---

@ topic
  Topic name.

@ partition
  Partition number.

=> Offset.


Cf. `offs-first`
raw docstring

partitionsclj

(partitions consumer topic)

Gets a list of partitions for a given topic.

@ consumer Kafka consumer.

@ topic Topic name.

=> List of partitions. Cf. interop.clj/partition-info

Throws

org.apache.kafka.common.errors

WakeupException
  When `unblock` is called before or while this fn is called.

InterruptException
  When the calling thread is interrupted.

AuthorizationException
  When not authorized to the specified topic.

TimeoutException
  When the topic metadata could not be fetched before expiration of the configured request timeout.

KafkaException
  Any other unrecoverable errors.
Gets a list of partitions for a given topic.


@ consumer
  Kafka consumer.

@ topic
  Topic name.

=> List of partitions.
   Cf. `interop.clj/partition-info`


Throws

  org.apache.kafka.common.errors

    WakeupException
      When `unblock` is called before or while this fn is called.

    InterruptException
      When the calling thread is interrupted.

    AuthorizationException
      When not authorized to the specified topic.

    TimeoutException
      When the topic metadata could not be fetched before expiration of the configured request timeout.

    KafkaException
      Any other unrecoverable errors.
raw docstring

pauseclj

(pause consumer topic-partitions)
(pause consumer topic partition)

Temporarely pauses consumption.

@ consumer Kafka consumer.


@ topic-partitions List of [topic partition]. Cf. milena.interop.java/topic-partition


@ topic Topic name.

@ partition Partition number.


=> consumer

Ex. (pause consumer "my-topic" 0)

 (pause consumer
        [["my-topic"      0]
         ["another-topic" 3]])
Temporarely pauses consumption.


@ consumer
  Kafka consumer.

---

@ topic-partitions
  List of [topic partition].
  Cf. `milena.interop.java/topic-partition`

---

@ topic
  Topic name.

@ partition
  Partition number.

---

 => `consumer`


 Ex. (pause consumer
            "my-topic"
            0)

     (pause consumer
            [["my-topic"      0]
             ["another-topic" 3]])
raw docstring

pausedclj

(paused consumer)

@ consumer Kafka consumer.

=> Set of [topic partition] currently paused. Cf. pause

@ consumer
  Kafka consumer.

=> Set of [topic partition] currently paused.
   Cf. `pause`
raw docstring

paused?clj

(paused? consumer topic-partition)
(paused? consumer topic partition)

Is a [topic partition] currenly paused ?

Cf. pause paused

Is a [topic partition] currenly paused ?

Cf. `pause`
    `paused`
raw docstring

pollclj

(poll consumer)
(poll consumer timeout-ms)

Synchronously polls records.

@ consumer Kafka consumer.

@ timeout-ms (nilable) Optional timeout in milliseconds. Nil will wait forever. 0 returns what is available in the consumer buffer without blocking.

=> Sequence of individual records. Cf. milena.interop.clj/consumer-record

Throws

org.apache.kafka.clients.consumer

InvalidOffsetException
  When the offset for a partition or set of partitions is undefined or out of range and no offset
  reset policy has been configured.

org.apache.kafka.common.errors

WakeupException
  When `unblock` is called while blocking.

InterruptException
  The calling thread is interrupted while blocking.

AuthorizationException
  When not authorized to any of the assigned topics or to the configured groupId.

KafkaException
  Any other unrecoverable errors (eg. deserializing key/value).
Synchronously polls records.


@ consumer
  Kafka consumer.

@ timeout-ms (nilable)
  Optional timeout in milliseconds.
    Nil will wait forever.
    0   returns what is available in the consumer buffer without blocking.

=> Sequence of individual records.
   Cf. `milena.interop.clj/consumer-record`


Throws

  org.apache.kafka.clients.consumer

    InvalidOffsetException
      When the offset for a partition or set of partitions is undefined or out of range and no offset
      reset policy has been configured.

  org.apache.kafka.common.errors

    WakeupException
      When `unblock` is called while blocking.

    InterruptException
      The calling thread is interrupted while blocking.

    AuthorizationException
      When not authorized to any of the assigned topics or to the configured groupId.

    KafkaException
      Any other unrecoverable errors (eg. deserializing key/value).
raw docstring

poll-partitionsclj

(poll-partitions consumer)
(poll-partitions consumer timeout-ms)

Synchronously polls records by partitions.

Behaves exactly like poll but returns a map of [topic partition] to sequence of individual records.

More efficient when the consumer polls several partitions in order to dispatch the results to workers.

Cf. poll

Synchronously polls records by partitions.

Behaves exactly like `poll` but returns a map of [topic partition] to sequence of individual records.

More efficient when the consumer polls several partitions in order to dispatch the results to workers.


Cf. `poll`
raw docstring

poll-seqclj

(poll-seq consumer)
(poll-seq consumer timeout-ms)

Convert a consumer to a sequence of individual records by lazily and continuously calling poll.

Ex. (take 5 (to-seq consumer))

Cf. poll for arguments and exceptions.

Convert a consumer to a sequence of individual records by lazily and continuously calling `poll`.


Ex. (take 5
          (to-seq consumer))


Cf. `poll` for arguments and exceptions.
raw docstring

resumeclj

(resume consumer)
(resume consumer topic-partitions)
(resume consumer topic partition)

Resumes consumptions.

Resumes everything if no other arg than the consumer is provided.

@ consumer Kafka consumer.


@ topic-partitions List of [topic partition]. Cf. milena.interop.java/topic-partition


@ topic Topic name.

@ partition Partition number.


=> consumer

Ex. (resume consumer)

(resume "my-topic"
        0)

(resume [["my-topic"      0]
         ["another-topic" 3]])

Cf. pause

Resumes consumptions.

Resumes everything if no other arg than the consumer is provided.


@ consumer
  Kafka consumer.

---

@ topic-partitions
  List of [topic partition].
  Cf. `milena.interop.java/topic-partition`

---

@ topic
  Topic name.

@ partition
  Partition number.

---

=> `consumer`


Ex. (resume consumer)
 
    (resume "my-topic"
            0)

    (resume [["my-topic"      0]
             ["another-topic" 3]])


Cf. `pause`
raw docstring

rewindclj

(rewind consumer)
(rewind consumer topic-partitions)
(rewind consumer topic partition)

Rewinds a consumer to the first available offset for one or several partitions.

Happens lazily on the next call to poll or offs-current.

If no partition is given, applies to all currently assigned partitions.

@ consumer Kafka consumer.


@ topic-partitions List of [topic partition]. Cf. milena.interop.java/topic-partition


@ topic Topic name.

@ partition Partition number.


=> consumer

Ex. (rewind consumer "my-topic" 0)

(rewind consumer
        [["my-topic"      0]
         ["another-topic" 3]])
Rewinds a consumer to the first available offset for one or several partitions.

Happens lazily on the next call to `poll` or `offs-current`.

If no partition is given, applies to all currently assigned partitions.


@ consumer
  Kafka consumer.

---

@ topic-partitions
  List of [topic partition].
  Cf. `milena.interop.java/topic-partition`

---

@ topic
  Topic name.

@ partition
  Partition number.

---

=> `consumer`


Ex. (rewind consumer
            "my-topic"
            0)

    (rewind consumer
            [["my-topic"      0]
             ["another-topic" 3]])
raw docstring

safelycljmacro

(safely consumer timeout-ms & body)

If the body doesn't compute before the required timeout, unblock will be called on the consumer.

If the body doesn't compute before the required timeout, `unblock` will be called on the consumer.
raw docstring

seekclj

(seek consumer offsets)
(seek consumer topic partition offset)

Seeks one or several partitions to a new offset (ie. offset of the next record).

Happens lazily on the next call to poll.

Note that you may loose data if this fn is arbitrarily called in the middle of consumption.

@ consumer Kafka consumer.


@ offsets Map of [topic partition] to offset. Cf. milena.interop.java/topic-partition


@ topic Topic name.

@ partition Partition number.

@ offset New offset.


=> consumer

Ex. (seek consumer "my-topic" 0 42)

(seek consumer
      {["my-topic"      0] 42
       ["another-topic" 3] 84})
Seeks one or several partitions to a new offset (ie. offset of the next record).

Happens lazily on the next call to `poll`.

Note that you may loose data if this fn is arbitrarily called in the middle of consumption.


@ consumer
  Kafka consumer.

---

@ offsets
  Map of [topic partition] to offset.
  Cf. `milena.interop.java/topic-partition`

---

@ topic
  Topic name.

@ partition
  Partition number.

@ offset
  New offset.

---

=> `consumer`


Ex. (seek consumer
          "my-topic"
          0
          42)

    (seek consumer
          {["my-topic"      0] 42
           ["another-topic" 3] 84})
raw docstring

topicsclj

(topics consumer)

Gets a list of metadata about partitions for all the topics the consumer is authorized to consume.

@ consumer Kafka consumer.

=> Map of topic names to partition infos. Cf. M.interop.clj/partition-info

Throws

org.apache.kafka.common.errors

WakeupException
  When `unblock` is called before or while this fn is called.

InterruptException
  When the calling thread is interrupted before of while this fn is called.

TimeoutException
  When the topic metadata could not be fetched before expiration of the configured request timeout.

KafkaException
  Any other unrecoverable errors.
Gets a list of metadata about partitions for all the topics the consumer is authorized to consume.


@ consumer
  Kafka consumer.

=> Map of topic names to partition infos.
   Cf. `M.interop.clj/partition-info`


Throws

  org.apache.kafka.common.errors

    WakeupException
      When `unblock` is called before or while this fn is called.

    InterruptException
      When the calling thread is interrupted before of while this fn is called.

    TimeoutException
      When the topic metadata could not be fetched before expiration of the configured request timeout.

    KafkaException
      Any other unrecoverable errors.
raw docstring

unblockclj

(unblock consumer)

From another thread, unblocks the consumer.

The blocking thread will throw an org.apache.kafka.common.errors.WakeupException.

If the thread is not blocking on a fn which can throw such an exception, the next call to such a fn will raise it instead.

Must be used sparingly, not to compensate for a bad design.

@ consumer Kafka consumer.

=> consumer

From another thread, unblocks the consumer.

The blocking thread will throw an org.apache.kafka.common.errors.WakeupException.

If the thread is not blocking on a fn which can throw such an exception, the next call
to such a fn will raise it instead.

Must be used sparingly, not to compensate for a bad design.


@ consumer
  Kafka consumer.

=> `consumer`
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close