Liking cljdoc? Tell your friends :D

kafka-metamorphosis.consumer

Kafka consumer functions with idiomatic Clojure interface

Kafka consumer functions with idiomatic Clojure interface
raw docstring

assign!clj

(assign! consumer partition-maps)

Manually assign the consumer to specific topic partitions.

Usage: (assign! consumer [{:topic "my-topic" :partition 0} {:topic "my-topic" :partition 1}])

Manually assign the consumer to specific topic partitions.

Usage:
(assign! consumer [{:topic "my-topic" :partition 0}
                   {:topic "my-topic" :partition 1}])
sourceraw docstring

close!clj

(close! consumer)
(close! consumer timeout-ms)

Close the consumer and release resources. Optionally specify timeout in milliseconds.

Close the consumer and release resources.
Optionally specify timeout in milliseconds.
sourceraw docstring

commit-async!clj

(commit-async! consumer)
(commit-async! consumer callback-fn)

Commit the current consumed offsets asynchronously. Optional callback function receives a map of topic-partitions to offsets and any exception.

Usage: (commit-async! consumer) (commit-async! consumer (fn [offsets exception] (when exception (println "Commit failed:" (.getMessage exception)))))

Commit the current consumed offsets asynchronously.
Optional callback function receives a map of topic-partitions to offsets and any exception.

Usage:
(commit-async! consumer)
(commit-async! consumer (fn [offsets exception]
                         (when exception
                           (println "Commit failed:" (.getMessage exception)))))
sourceraw docstring

commit-sync!clj

(commit-sync! consumer)

Commit the current consumed offsets synchronously.

Commit the current consumed offsets synchronously.
sourceraw docstring

consume!clj

(consume! consumer
          {:keys [poll-timeout handler stop-fn]
           :or {poll-timeout 1000 stop-fn (constantly false)}})

Consume records continuously with a handler function. The handler function receives each record map.

Options: :poll-timeout - timeout for each poll in milliseconds (default: 1000) :stop-fn - function that returns true when consumption should stop (default: never stop)

Usage: (consume! consumer {:poll-timeout 1000 :handler (fn [record] (println "Received:" (:value record))) :stop-fn #(some-condition?)})

Consume records continuously with a handler function.
The handler function receives each record map.

Options:
:poll-timeout - timeout for each poll in milliseconds (default: 1000)
:stop-fn - function that returns true when consumption should stop (default: never stop)

Usage:
(consume! consumer 
          {:poll-timeout 1000
           :handler (fn [record] 
                     (println "Received:" (:value record)))
           :stop-fn #(some-condition?)})
sourceraw docstring

createclj

(create config)

Create a Kafka consumer with the given configuration map.

Example config: {:bootstrap-servers "localhost:9092" :group-id "my-consumer-group" :key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :auto-offset-reset "earliest" :enable-auto-commit true :schemas true} ; Enable automatic schema validation based on topic

Schema validation options:

  • :schemas true - Auto-detect schema based on topic name (:topic/default or :topic)
  • :schemas :my-schema - Use specific schema for all messages
  • :schemas {"topic1" :schema1 "topic2" :schema2} - Per-topic schema mapping
Create a Kafka consumer with the given configuration map.

Example config:
{:bootstrap-servers "localhost:9092"
 :group-id "my-consumer-group"
 :key-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
 :value-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
 :auto-offset-reset "earliest"
 :enable-auto-commit true
 :schemas true}  ; Enable automatic schema validation based on topic

Schema validation options:
- :schemas true - Auto-detect schema based on topic name (:topic/default or :topic)
- :schemas :my-schema - Use specific schema for all messages
- :schemas {"topic1" :schema1 "topic2" :schema2} - Per-topic schema mapping
sourceraw docstring

poll!clj

(poll! consumer timeout-ms)

Poll for new records with the given timeout in milliseconds. Returns a seq of maps with keys: :topic :partition :offset :key :value :timestamp

If the consumer was created with :schemas config, automatic validation will be performed and invalid messages will be filtered out.

Usage: (poll! consumer 1000) ; Poll for 1 second

Poll for new records with the given timeout in milliseconds.
Returns a seq of maps with keys: :topic :partition :offset :key :value :timestamp

If the consumer was created with :schemas config, automatic validation will be performed
and invalid messages will be filtered out.

Usage:
(poll! consumer 1000) ; Poll for 1 second
sourceraw docstring

seek!clj

(seek! consumer topic partition offset)

Seek to a specific offset for a given topic partition.

Usage: (seek! consumer "my-topic" 0 100) ; Seek to offset 100 in partition 0

Seek to a specific offset for a given topic partition.

Usage:
(seek! consumer "my-topic" 0 100) ; Seek to offset 100 in partition 0
sourceraw docstring

subscribe!clj

(subscribe! consumer topics)

Subscribe to the given list of topics.

Usage: (subscribe! consumer ["topic1" "topic2"]) (subscribe! consumer ["topic1"])

Subscribe to the given list of topics.

Usage:
(subscribe! consumer ["topic1" "topic2"])
(subscribe! consumer ["topic1"])
sourceraw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close