Kafka consumer functions with idiomatic Clojure interface
Kafka consumer functions with idiomatic Clojure interface
(assign! consumer partition-maps)Manually assign the consumer to specific topic partitions.
Usage: (assign! consumer [{:topic "my-topic" :partition 0} {:topic "my-topic" :partition 1}])
Manually assign the consumer to specific topic partitions.
Usage:
(assign! consumer [{:topic "my-topic" :partition 0}
{:topic "my-topic" :partition 1}])(close! consumer)(close! consumer timeout-ms)Close the consumer and release resources. Optionally specify timeout in milliseconds.
Close the consumer and release resources. Optionally specify timeout in milliseconds.
(commit-async! consumer)(commit-async! consumer callback-fn)Commit the current consumed offsets asynchronously. Optional callback function receives a map of topic-partitions to offsets and any exception.
Usage: (commit-async! consumer) (commit-async! consumer (fn [offsets exception] (when exception (println "Commit failed:" (.getMessage exception)))))
Commit the current consumed offsets asynchronously.
Optional callback function receives a map of topic-partitions to offsets and any exception.
Usage:
(commit-async! consumer)
(commit-async! consumer (fn [offsets exception]
(when exception
(println "Commit failed:" (.getMessage exception)))))(commit-sync! consumer)Commit the current consumed offsets synchronously.
Commit the current consumed offsets synchronously.
(consume! consumer
{:keys [poll-timeout handler stop-fn]
:or {poll-timeout 1000 stop-fn (constantly false)}})Consume records continuously with a handler function. The handler function receives each record map.
Options: :poll-timeout - timeout for each poll in milliseconds (default: 1000) :stop-fn - function that returns true when consumption should stop (default: never stop)
Usage: (consume! consumer {:poll-timeout 1000 :handler (fn [record] (println "Received:" (:value record))) :stop-fn #(some-condition?)})
Consume records continuously with a handler function.
The handler function receives each record map.
Options:
:poll-timeout - timeout for each poll in milliseconds (default: 1000)
:stop-fn - function that returns true when consumption should stop (default: never stop)
Usage:
(consume! consumer
{:poll-timeout 1000
:handler (fn [record]
(println "Received:" (:value record)))
:stop-fn #(some-condition?)})(create config)Create a Kafka consumer with the given configuration map.
Example config: {:bootstrap-servers "localhost:9092" :group-id "my-consumer-group" :key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :auto-offset-reset "earliest" :enable-auto-commit true :schemas true} ; Enable automatic schema validation based on topic
Schema validation options:
Create a Kafka consumer with the given configuration map.
Example config:
{:bootstrap-servers "localhost:9092"
:group-id "my-consumer-group"
:key-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
:value-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
:auto-offset-reset "earliest"
:enable-auto-commit true
:schemas true} ; Enable automatic schema validation based on topic
Schema validation options:
- :schemas true - Auto-detect schema based on topic name (:topic/default or :topic)
- :schemas :my-schema - Use specific schema for all messages
- :schemas {"topic1" :schema1 "topic2" :schema2} - Per-topic schema mapping(poll! consumer timeout-ms)Poll for new records with the given timeout in milliseconds. Returns a seq of maps with keys: :topic :partition :offset :key :value :timestamp
If the consumer was created with :schemas config, automatic validation will be performed and invalid messages will be filtered out.
Usage: (poll! consumer 1000) ; Poll for 1 second
Poll for new records with the given timeout in milliseconds. Returns a seq of maps with keys: :topic :partition :offset :key :value :timestamp If the consumer was created with :schemas config, automatic validation will be performed and invalid messages will be filtered out. Usage: (poll! consumer 1000) ; Poll for 1 second
(seek! consumer topic partition offset)Seek to a specific offset for a given topic partition.
Usage: (seek! consumer "my-topic" 0 100) ; Seek to offset 100 in partition 0
Seek to a specific offset for a given topic partition. Usage: (seek! consumer "my-topic" 0 100) ; Seek to offset 100 in partition 0
(subscribe! consumer topics)Subscribe to the given list of topics.
Usage: (subscribe! consumer ["topic1" "topic2"]) (subscribe! consumer ["topic1"])
Subscribe to the given list of topics. Usage: (subscribe! consumer ["topic1" "topic2"]) (subscribe! consumer ["topic1"])
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |