Liking cljdoc? Tell your friends :D

kafka-metamorphosis.consumer

Kafka consumer functions with idiomatic Clojure interface

Kafka consumer functions with idiomatic Clojure interface
raw docstring

assign!clj

(assign! consumer partition-maps)

Manually assign the consumer to specific topic partitions.

Usage: (assign! consumer [{:topic "my-topic" :partition 0} {:topic "my-topic" :partition 1}])

Manually assign the consumer to specific topic partitions.

Usage:
(assign! consumer [{:topic "my-topic" :partition 0}
                   {:topic "my-topic" :partition 1}])
sourceraw docstring

close!clj

(close! consumer)
(close! consumer timeout-ms)

Close the consumer and release resources. Optionally specify timeout in milliseconds.

Close the consumer and release resources.
Optionally specify timeout in milliseconds.
sourceraw docstring

commit-async!clj

(commit-async! consumer)
(commit-async! consumer callback-fn)

Commit the current consumed offsets asynchronously. Optional callback function receives a map of topic-partitions to offsets and any exception.

Usage: (commit-async! consumer) (commit-async! consumer (fn [offsets exception] (when exception (println "Commit failed:" (.getMessage exception)))))

Commit the current consumed offsets asynchronously.
Optional callback function receives a map of topic-partitions to offsets and any exception.

Usage:
(commit-async! consumer)
(commit-async! consumer (fn [offsets exception]
                         (when exception
                           (println "Commit failed:" (.getMessage exception)))))
sourceraw docstring

commit-sync!clj

(commit-sync! consumer)

Commit the current consumed offsets synchronously.

Commit the current consumed offsets synchronously.
sourceraw docstring

consume!clj

(consume! consumer
          {:keys [poll-timeout handler stop-fn]
           :or {poll-timeout 1000 stop-fn (constantly false)}})

Consume records continuously with a handler function. The handler function receives each record map.

Options: :poll-timeout - timeout for each poll in milliseconds (default: 1000) :stop-fn - function that returns true when consumption should stop (default: never stop)

Usage: (consume! consumer {:poll-timeout 1000 :handler (fn [record] (println "Received:" (:value record))) :stop-fn #(some-condition?)})

Consume records continuously with a handler function.
The handler function receives each record map.

Options:
:poll-timeout - timeout for each poll in milliseconds (default: 1000)
:stop-fn - function that returns true when consumption should stop (default: never stop)

Usage:
(consume! consumer 
          {:poll-timeout 1000
           :handler (fn [record] 
                     (println "Received:" (:value record)))
           :stop-fn #(some-condition?)})
sourceraw docstring

createclj

(create config)

Create a Kafka consumer with the given configuration map.

Example config: {:bootstrap-servers "localhost:9092" :group-id "my-consumer-group" :key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :auto-offset-reset "earliest" :enable-auto-commit true}

Create a Kafka consumer with the given configuration map.

Example config:
{:bootstrap-servers "localhost:9092"
 :group-id "my-consumer-group"
 :key-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
 :value-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
 :auto-offset-reset "earliest"
 :enable-auto-commit true}
sourceraw docstring

poll!clj

(poll! consumer timeout-ms)

Poll for new records with the given timeout in milliseconds. Returns a seq of maps with keys: :topic :partition :offset :key :value :timestamp

Usage: (poll! consumer 1000) ; Poll for 1 second

Poll for new records with the given timeout in milliseconds.
Returns a seq of maps with keys: :topic :partition :offset :key :value :timestamp

Usage:
(poll! consumer 1000) ; Poll for 1 second
sourceraw docstring

seek!clj

(seek! consumer topic partition offset)

Seek to a specific offset for a given topic partition.

Usage: (seek! consumer "my-topic" 0 100) ; Seek to offset 100 in partition 0

Seek to a specific offset for a given topic partition.

Usage:
(seek! consumer "my-topic" 0 100) ; Seek to offset 100 in partition 0
sourceraw docstring

subscribe!clj

(subscribe! consumer topics)

Subscribe to the given list of topics.

Usage: (subscribe! consumer ["topic1" "topic2"]) (subscribe! consumer ["topic1"])

Subscribe to the given list of topics.

Usage:
(subscribe! consumer ["topic1" "topic2"])
(subscribe! consumer ["topic1"])
sourceraw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close