Clojure interface for Kafka Producer API. For complete JavaDocs, see: https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/package-summary.html
Clojure interface for Kafka Producer API. For complete JavaDocs, see: https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/package-summary.html
(close producer)
(close producer timeout-ms)
Like .close
, but with a default time unit of ms for the arity with timeout.
See:
Like `.close`, but with a default time unit of ms for the arity with timeout. See: - https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close() - https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close(java.time.Duration)
(flush producer)
See: https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#flush()
See: https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#flush()
(metrics producer)
Returns a sequence of maps representing all the producer's internal metrics. Each map contains information about metric-group (:group), metric-name (:name), metric-description (:description), metric-tags (:tags) and metric-value (:value)
Usage :
(metrics producer) ;; => [{:group "producer-metrics", ;; :name "record-queue-time-avg", ;; :description "The average time in ms record batches spent in the record accumulator.", ;; :tags {"client-id" "producer-2"}, ;; :value 0.1} ;; {:group "producer-metrics", ;; :name "outgoing-byte-rate", ;; :description "The average number of outgoing bytes sent per second to all servers.", ;; :tags {"client-id" "producer-2"}, ;; :value 31.668376965849703} ;; {:group "producer-node-metrics", ;; :name "response-rate", ;; :description "The average number of responses received per second.", ;; :tags {"client-id" "producer-2", "node-id" "node-3"}, ;; :value 0.23866348448687352}]
Returns a sequence of maps representing all the producer's internal metrics. Each map contains information about metric-group (:group), metric-name (:name), metric-description (:description), metric-tags (:tags) and metric-value (:value) Usage : (metrics producer) ;; => [{:group "producer-metrics", ;; :name "record-queue-time-avg", ;; :description "The average time in ms record batches spent in the record accumulator.", ;; :tags {"client-id" "producer-2"}, ;; :value 0.1} ;; {:group "producer-metrics", ;; :name "outgoing-byte-rate", ;; :description "The average number of outgoing bytes sent per second to all servers.", ;; :tags {"client-id" "producer-2"}, ;; :value 31.668376965849703} ;; {:group "producer-node-metrics", ;; :name "response-rate", ;; :description "The average number of responses received per second.", ;; :tags {"client-id" "producer-2", "node-id" "node-3"}, ;; :value 0.23866348448687352}]
(partitions producer topic)
Returns a sequence of maps which represent information about each partition of the specified topic.
Usage :
(partitions producer "topic-a") ;; => [{:topic "topic-a", ;; :partition 2, ;; :leader {:id 2, :host "172.17.0.3", :port 9093}, ;; :replicas [{:id 2, :host "172.17.0.3", :port 9093} ;; {:id 3, :host "172.17.0.5", :port 9094}], ;; :in-sync-replicas [{:id 2, :host "172.17.0.3", :port 9093} ;; {:id 3, :host "172.17.0.5", :port 9094}]} ;; {:topic "topic-a", ;; :partition 1, ;; :leader {:id 1, :host "172.17.0.4", :port 9092}, ;; :replicas [{:id 1, :host "172.17.0.4", :port 9092} ;; {:id 2, :host "172.17.0.3", :port 9093}], ;; :in-sync-replicas [{:id 1, :host "172.17.0.4", :port 9092} ;; {:id 2, :host "172.17.0.3", :port 9093}]}]
Returns a sequence of maps which represent information about each partition of the specified topic. Usage : (partitions producer "topic-a") ;; => [{:topic "topic-a", ;; :partition 2, ;; :leader {:id 2, :host "172.17.0.3", :port 9093}, ;; :replicas [{:id 2, :host "172.17.0.3", :port 9093} ;; {:id 3, :host "172.17.0.5", :port 9094}], ;; :in-sync-replicas [{:id 2, :host "172.17.0.3", :port 9093} ;; {:id 3, :host "172.17.0.5", :port 9094}]} ;; {:topic "topic-a", ;; :partition 1, ;; :leader {:id 1, :host "172.17.0.4", :port 9092}, ;; :replicas [{:id 1, :host "172.17.0.4", :port 9092} ;; {:id 2, :host "172.17.0.3", :port 9093}], ;; :in-sync-replicas [{:id 1, :host "172.17.0.4", :port 9092} ;; {:id 2, :host "172.17.0.3", :port 9093}]}]
(producer config)
(producer config key-serializer value-serializer)
Takes a map of config options and returns a KafkaProducer
for publishing records to Kafka.
NOTE KafkaProducer
instances are thread-safe and should generally be shared for best performance.
For more information and available config options, see: https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html https://kafka.apache.org/documentation.html#producerconfigs
Usage:
;; Created using just a map of configs, in this case the keys ;; bootstrap.servers value.serializer and key.serializer are required (producer {"bootstrap.servers" "localhost:9092" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer"})
;; Created using a map of configs and the serializers for keys and values. (producer {"bootstrap.servers" "localhost:9092"} (string-serializer) (string-serializer))
;; KafkaProducer should be closed when not used anymore, as it's closeable, ;; it can be used in the with-open macro (def config {"bootstrap.servers" "localhost:9092"}) (with-open [p (producer config (string-serializer) (string-serializer))] (-> (send p (record "topic-a" "Hello World")) (.get)))
Takes a map of config options and returns a `KafkaProducer` for publishing records to Kafka. NOTE `KafkaProducer` instances are thread-safe and should generally be shared for best performance. For more information and available config options, see: https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html https://kafka.apache.org/documentation.html#producerconfigs Usage: ;; Created using just a map of configs, in this case the keys ;; bootstrap.servers value.serializer and key.serializer are required (producer {"bootstrap.servers" "localhost:9092" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer"}) ;; Created using a map of configs and the serializers for keys and values. (producer {"bootstrap.servers" "localhost:9092"} (string-serializer) (string-serializer)) ;; KafkaProducer should be closed when not used anymore, as it's closeable, ;; it can be used in the with-open macro (def config {"bootstrap.servers" "localhost:9092"}) (with-open [p (producer config (string-serializer) (string-serializer))] (-> (send p (record "topic-a" "Hello World")) (.get)))
(record topic value)
(record topic key value)
(record topic partition key value)
Return a record that can be published to Kafka using send
.
Return a record that can be published to Kafka using [[send]].
(send producer record)
(send producer record callback)
Asynchronously send a record to Kafka. Returns a Future
of a map
with :topic
, :partition
and :offset
keys. Optionally provide
a callback fn that will be called when the operation completes.
Callback should be a fn of two arguments, a map as above, and an
exception. Exception will be nil if operation succeeded.
Usage:
;;To send the message asynchronously and return a Future (send producer (record "topic-a" "Test message 1")) ;; => #object[string representation of future object]
;;To send message synchronously, deref the returned Future @(send producer (record "topic-a" "Test message 2")) ;; => {:topic "topic-a", :partition 4, :offset 0}
;;To send the message asynchronously and provide a callback ;;returns the future. (send producer (record "topic-a" "Test message 3") #(println "Metadata->" %1 "Exception->" %2)) ;; => #object[string representation of future object] ;; Metadata-> {:topic topic-unknown, :partition 4, :offset 1} Exception-> nil
Asynchronously send a record to Kafka. Returns a `Future` of a map with `:topic`, `:partition` and `:offset` keys. Optionally provide a callback fn that will be called when the operation completes. Callback should be a fn of two arguments, a map as above, and an exception. Exception will be nil if operation succeeded. See: https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord) Usage: ;;To send the message asynchronously and return a Future (send producer (record "topic-a" "Test message 1")) ;; => #object[string representation of future object] ;;To send message synchronously, deref the returned Future @(send producer (record "topic-a" "Test message 2")) ;; => {:topic "topic-a", :partition 4, :offset 0} ;;To send the message asynchronously and provide a callback ;;returns the future. (send producer (record "topic-a" "Test message 3") #(println "Metadata->" %1 "Exception->" %2)) ;; => #object[string representation of future object] ;; Metadata-> {:topic topic-unknown, :partition 4, :offset 1} Exception-> nil
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close