Kafka consumer functions with idiomatic Clojure interface
Kafka consumer functions with idiomatic Clojure interface
(assign! consumer partition-maps)
Manually assign the consumer to specific topic partitions.
Usage: (assign! consumer [{:topic "my-topic" :partition 0} {:topic "my-topic" :partition 1}])
Manually assign the consumer to specific topic partitions. Usage: (assign! consumer [{:topic "my-topic" :partition 0} {:topic "my-topic" :partition 1}])
(close! consumer)
(close! consumer timeout-ms)
Close the consumer and release resources. Optionally specify timeout in milliseconds.
Close the consumer and release resources. Optionally specify timeout in milliseconds.
(commit-async! consumer)
(commit-async! consumer callback-fn)
Commit the current consumed offsets asynchronously. Optional callback function receives a map of topic-partitions to offsets and any exception.
Usage: (commit-async! consumer) (commit-async! consumer (fn [offsets exception] (when exception (println "Commit failed:" (.getMessage exception)))))
Commit the current consumed offsets asynchronously. Optional callback function receives a map of topic-partitions to offsets and any exception. Usage: (commit-async! consumer) (commit-async! consumer (fn [offsets exception] (when exception (println "Commit failed:" (.getMessage exception)))))
(commit-sync! consumer)
Commit the current consumed offsets synchronously.
Commit the current consumed offsets synchronously.
(consume! consumer
{:keys [poll-timeout handler stop-fn]
:or {poll-timeout 1000 stop-fn (constantly false)}})
Consume records continuously with a handler function. The handler function receives each record map.
Options: :poll-timeout - timeout for each poll in milliseconds (default: 1000) :stop-fn - function that returns true when consumption should stop (default: never stop)
Usage: (consume! consumer {:poll-timeout 1000 :handler (fn [record] (println "Received:" (:value record))) :stop-fn #(some-condition?)})
Consume records continuously with a handler function. The handler function receives each record map. Options: :poll-timeout - timeout for each poll in milliseconds (default: 1000) :stop-fn - function that returns true when consumption should stop (default: never stop) Usage: (consume! consumer {:poll-timeout 1000 :handler (fn [record] (println "Received:" (:value record))) :stop-fn #(some-condition?)})
(create config)
Create a Kafka consumer with the given configuration map.
Example config: {:bootstrap-servers "localhost:9092" :group-id "my-consumer-group" :key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :auto-offset-reset "earliest" :enable-auto-commit true :schemas true} ; Enable automatic schema validation based on topic
Schema validation options:
Create a Kafka consumer with the given configuration map. Example config: {:bootstrap-servers "localhost:9092" :group-id "my-consumer-group" :key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" :auto-offset-reset "earliest" :enable-auto-commit true :schemas true} ; Enable automatic schema validation based on topic Schema validation options: - :schemas true - Auto-detect schema based on topic name (:topic/default or :topic) - :schemas :my-schema - Use specific schema for all messages - :schemas {"topic1" :schema1 "topic2" :schema2} - Per-topic schema mapping
(poll! consumer timeout-ms)
Poll for new records with the given timeout in milliseconds. Returns a seq of maps with keys: :topic :partition :offset :key :value :timestamp
If the consumer was created with :schemas config, automatic validation will be performed and invalid messages will be filtered out.
Usage: (poll! consumer 1000) ; Poll for 1 second
Poll for new records with the given timeout in milliseconds. Returns a seq of maps with keys: :topic :partition :offset :key :value :timestamp If the consumer was created with :schemas config, automatic validation will be performed and invalid messages will be filtered out. Usage: (poll! consumer 1000) ; Poll for 1 second
(seek! consumer topic partition offset)
Seek to a specific offset for a given topic partition.
Usage: (seek! consumer "my-topic" 0 100) ; Seek to offset 100 in partition 0
Seek to a specific offset for a given topic partition. Usage: (seek! consumer "my-topic" 0 100) ; Seek to offset 100 in partition 0
(subscribe! consumer topics)
Subscribe to the given list of topics.
Usage: (subscribe! consumer ["topic1" "topic2"]) (subscribe! consumer ["topic1"])
Subscribe to the given list of topics. Usage: (subscribe! consumer ["topic1" "topic2"]) (subscribe! consumer ["topic1"])
cljdoc builds & hosts documentation for Clojure/Script libraries
Ctrl+k | Jump to recent docs |
← | Move to previous article |
→ | Move to next article |
Ctrl+/ | Jump to the search field |