Liking cljdoc? Tell your friends :D

kafka-metamorphosis.producer

Kafka producer functions with idiomatic Clojure interface

Kafka producer functions with idiomatic Clojure interface
raw docstring

close!clj

(close! producer)
(close! producer timeout-ms)

Close the producer and release resources. Optionally specify timeout in milliseconds.

Close the producer and release resources.
Optionally specify timeout in milliseconds.
sourceraw docstring

createclj

(create config)

Create a Kafka producer with the given configuration map.

Example config: {:bootstrap-servers "localhost:9092" :key-serializer "org.apache.kafka.common.serialization.StringSerializer" :value-serializer "org.apache.kafka.common.serialization.StringSerializer" :acks "all" :retries 3 :schemas true} ; Enable automatic schema validation based on topic

Schema validation options:

  • :schemas true - Auto-detect schema based on topic name (:topic/default or :topic)
  • :schemas :my-schema - Use specific schema for all messages
  • :schemas {"topic1" :schema1 "topic2" :schema2} - Per-topic schema mapping
Create a Kafka producer with the given configuration map.

Example config:
{:bootstrap-servers "localhost:9092"
 :key-serializer "org.apache.kafka.common.serialization.StringSerializer"
 :value-serializer "org.apache.kafka.common.serialization.StringSerializer"
 :acks "all"
 :retries 3
 :schemas true}  ; Enable automatic schema validation based on topic

Schema validation options:
- :schemas true - Auto-detect schema based on topic name (:topic/default or :topic)
- :schemas :my-schema - Use specific schema for all messages
- :schemas {"topic1" :schema1 "topic2" :schema2} - Per-topic schema mapping
sourceraw docstring

flush!clj

(flush! producer)

Flush any buffered records to the Kafka cluster. This method makes all buffered records immediately available to send.

Flush any buffered records to the Kafka cluster.
This method makes all buffered records immediately available to send.
sourceraw docstring

send!clj

(send! producer topic value)
(send! producer topic key value)

Send a message to a Kafka topic synchronously. Returns RecordMetadata on success, nil if validation fails.

If the producer was created with :schemas config, automatic validation will be performed.

Usage: (send! producer "my-topic" "key" "value") (send! producer "my-topic" nil "value-only") ; key can be nil

Send a message to a Kafka topic synchronously.
Returns RecordMetadata on success, nil if validation fails.

If the producer was created with :schemas config, automatic validation will be performed.

Usage:
(send! producer "my-topic" "key" "value")
(send! producer "my-topic" nil "value-only") ; key can be nil
sourceraw docstring

send-async!clj

(send-async! producer topic value)
(send-async! producer topic key value)
(send-async! producer topic key value callback-fn)

Send a message to a Kafka topic asynchronously with optional callback. Returns a Future, or nil if validation fails.

If the producer was created with :schemas config, automatic validation will be performed.

Callback function receives [metadata exception] where:

  • metadata: RecordMetadata if successful, nil if error
  • exception: Exception if error occurred, nil if successful

Usage: (send-async! producer "my-topic" "key" "value") (send-async! producer "my-topic" "key" "value" (fn [metadata exception] (if exception (println "Error:" (.getMessage exception)) (println "Sent to partition:" (.partition metadata)))))

Send a message to a Kafka topic asynchronously with optional callback.
Returns a Future, or nil if validation fails.

If the producer was created with :schemas config, automatic validation will be performed.

Callback function receives [metadata exception] where:
- metadata: RecordMetadata if successful, nil if error
- exception: Exception if error occurred, nil if successful

Usage:
(send-async! producer "my-topic" "key" "value")
(send-async! producer "my-topic" "key" "value" 
             (fn [metadata exception]
               (if exception
                 (println "Error:" (.getMessage exception))
                 (println "Sent to partition:" (.partition metadata)))))
sourceraw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close