Kafka producer functions with idiomatic Clojure interface
Kafka producer functions with idiomatic Clojure interface
(close! producer)
(close! producer timeout-ms)
Close the producer and release resources. Optionally specify timeout in milliseconds.
Close the producer and release resources. Optionally specify timeout in milliseconds.
(create config)
Create a Kafka producer with the given configuration map.
Example config: {:bootstrap-servers "localhost:9092" :key-serializer "org.apache.kafka.common.serialization.StringSerializer" :value-serializer "org.apache.kafka.common.serialization.StringSerializer" :acks "all" :retries 3 :schemas true} ; Enable automatic schema validation based on topic
Schema validation options:
Create a Kafka producer with the given configuration map. Example config: {:bootstrap-servers "localhost:9092" :key-serializer "org.apache.kafka.common.serialization.StringSerializer" :value-serializer "org.apache.kafka.common.serialization.StringSerializer" :acks "all" :retries 3 :schemas true} ; Enable automatic schema validation based on topic Schema validation options: - :schemas true - Auto-detect schema based on topic name (:topic/default or :topic) - :schemas :my-schema - Use specific schema for all messages - :schemas {"topic1" :schema1 "topic2" :schema2} - Per-topic schema mapping
(flush! producer)
Flush any buffered records to the Kafka cluster. This method makes all buffered records immediately available to send.
Flush any buffered records to the Kafka cluster. This method makes all buffered records immediately available to send.
(send! producer topic value)
(send! producer topic key value)
Send a message to a Kafka topic synchronously. Returns RecordMetadata on success, nil if validation fails.
If the producer was created with :schemas config, automatic validation will be performed.
Usage: (send! producer "my-topic" "key" "value") (send! producer "my-topic" nil "value-only") ; key can be nil
Send a message to a Kafka topic synchronously. Returns RecordMetadata on success, nil if validation fails. If the producer was created with :schemas config, automatic validation will be performed. Usage: (send! producer "my-topic" "key" "value") (send! producer "my-topic" nil "value-only") ; key can be nil
(send-async! producer topic value)
(send-async! producer topic key value)
(send-async! producer topic key value callback-fn)
Send a message to a Kafka topic asynchronously with optional callback. Returns a Future, or nil if validation fails.
If the producer was created with :schemas config, automatic validation will be performed.
Callback function receives [metadata exception] where:
Usage: (send-async! producer "my-topic" "key" "value") (send-async! producer "my-topic" "key" "value" (fn [metadata exception] (if exception (println "Error:" (.getMessage exception)) (println "Sent to partition:" (.partition metadata)))))
Send a message to a Kafka topic asynchronously with optional callback. Returns a Future, or nil if validation fails. If the producer was created with :schemas config, automatic validation will be performed. Callback function receives [metadata exception] where: - metadata: RecordMetadata if successful, nil if error - exception: Exception if error occurred, nil if successful Usage: (send-async! producer "my-topic" "key" "value") (send-async! producer "my-topic" "key" "value" (fn [metadata exception] (if exception (println "Error:" (.getMessage exception)) (println "Sent to partition:" (.partition metadata)))))
cljdoc builds & hosts documentation for Clojure/Script libraries
Ctrl+k | Jump to recent docs |
← | Move to previous article |
→ | Move to next article |
Ctrl+/ | Jump to the search field |