Clojure interface for Kafka Producer API. For complete JavaDocs, see: http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/package-summary.html
Clojure interface for Kafka Producer API. For complete JavaDocs, see: http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/package-summary.html
(close producer)(close producer timeout-ms)Like .close, but with a default time unit of ms for the arity with timeout.
See:
Like `.close`, but with a default time unit of ms for the arity with timeout. See: - http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close() - http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close(long,%20java.util.concurrent.TimeUnit)
(flush producer)See: http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#flush()
See: http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#flush()
(metrics producer)Returns a sequence of maps representing all the producer's internal metrics. Each map contains information about metric-group (:group), metric-name (:name), metric-description (:description), metric-tags (:tags) and metric-value (:value)
Usage :
(metrics producer) ;; => [{:group "producer-metrics", ;; :name "record-queue-time-avg", ;; :description "The average time in ms record batches spent in the record accumulator.", ;; :tags {"client-id" "producer-2"}, ;; :value 0.1} ;; {:group "producer-metrics", ;; :name "outgoing-byte-rate", ;; :description "The average number of outgoing bytes sent per second to all servers.", ;; :tags {"client-id" "producer-2"}, ;; :value 31.668376965849703} ;; {:group "producer-node-metrics", ;; :name "response-rate", ;; :description "The average number of responses received per second.", ;; :tags {"client-id" "producer-2", "node-id" "node-3"}, ;; :value 0.23866348448687352}]
Returns a sequence of maps representing all the producer's internal metrics.
Each map contains information about metric-group (:group), metric-name (:name),
metric-description (:description), metric-tags (:tags) and metric-value (:value)
Usage :
(metrics producer)
;; => [{:group "producer-metrics",
;; :name "record-queue-time-avg",
;; :description "The average time in ms record batches spent in the record accumulator.",
;; :tags {"client-id" "producer-2"},
;; :value 0.1}
;; {:group "producer-metrics",
;; :name "outgoing-byte-rate",
;; :description "The average number of outgoing bytes sent per second to all servers.",
;; :tags {"client-id" "producer-2"},
;; :value 31.668376965849703}
;; {:group "producer-node-metrics",
;; :name "response-rate",
;; :description "The average number of responses received per second.",
;; :tags {"client-id" "producer-2", "node-id" "node-3"},
;; :value 0.23866348448687352}]
(partitions producer topic)Returns a sequence of maps which represent information about each partition of the specified topic.
Usage :
(partitions producer "topic-a") ;; => [{:topic "topic-a", ;; :partition 2, ;; :leader {:id 2, :host "172.17.0.3", :port 9093}, ;; :replicas [{:id 2, :host "172.17.0.3", :port 9093} ;; {:id 3, :host "172.17.0.5", :port 9094}], ;; :in-sync-replicas [{:id 2, :host "172.17.0.3", :port 9093} ;; {:id 3, :host "172.17.0.5", :port 9094}]} ;; {:topic "topic-a", ;; :partition 1, ;; :leader {:id 1, :host "172.17.0.4", :port 9092}, ;; :replicas [{:id 1, :host "172.17.0.4", :port 9092} ;; {:id 2, :host "172.17.0.3", :port 9093}], ;; :in-sync-replicas [{:id 1, :host "172.17.0.4", :port 9092} ;; {:id 2, :host "172.17.0.3", :port 9093}]}]
Returns a sequence of maps which represent information about each partition of the
specified topic.
Usage :
(partitions producer "topic-a")
;; => [{:topic "topic-a",
;; :partition 2,
;; :leader {:id 2, :host "172.17.0.3", :port 9093},
;; :replicas [{:id 2, :host "172.17.0.3", :port 9093}
;; {:id 3, :host "172.17.0.5", :port 9094}],
;; :in-sync-replicas [{:id 2, :host "172.17.0.3", :port 9093}
;; {:id 3, :host "172.17.0.5", :port 9094}]}
;; {:topic "topic-a",
;; :partition 1,
;; :leader {:id 1, :host "172.17.0.4", :port 9092},
;; :replicas [{:id 1, :host "172.17.0.4", :port 9092}
;; {:id 2, :host "172.17.0.3", :port 9093}],
;; :in-sync-replicas [{:id 1, :host "172.17.0.4", :port 9092}
;; {:id 2, :host "172.17.0.3", :port 9093}]}]
(producer config)(producer config key-serializer value-serializer)Takes a map of config options and returns a KafkaProducer for publishing records to Kafka.
NOTE KafkaProducer instances are thread-safe and should generally be shared for best performance.
For more information and available config options, see: http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html http://kafka.apache.org/documentation.html#producerconfigs
Usage:
;; Created using just a map of configs, in this case the keys ;; bootstrap.servers value.serializer and key.serializer are required (producer {"bootstrap.servers" "localhost:9092" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer"})
;; Created using a map of configs and the serializers for keys and values. (producer {"bootstrap.servers" "localhost:9092"} (string-serializer) (string-serializer))
;; KafkaProducer should be closed when not used anymore, as it's closeable, ;; it can be used in the with-open macro (def config {"bootstrap.servers" "localhost:9092"}) (with-open [p (producer config (string-serializer) (string-serializer))] (-> (send p (record "topic-a" "Hello World")) (.get)))
Takes a map of config options and returns a `KafkaProducer` for publishing records to Kafka.
NOTE `KafkaProducer` instances are thread-safe and should generally be shared for best performance.
For more information and available config options,
see: http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
http://kafka.apache.org/documentation.html#producerconfigs
Usage:
;; Created using just a map of configs, in this case the keys
;; bootstrap.servers value.serializer and key.serializer are required
(producer {"bootstrap.servers" "localhost:9092"
"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"
"key.serializer" "org.apache.kafka.common.serialization.StringSerializer"})
;; Created using a map of configs and the serializers for keys and values.
(producer {"bootstrap.servers" "localhost:9092"} (string-serializer) (string-serializer))
;; KafkaProducer should be closed when not used anymore, as it's closeable,
;; it can be used in the with-open macro
(def config {"bootstrap.servers" "localhost:9092"})
(with-open [p (producer config (string-serializer) (string-serializer))]
(-> (send p (record "topic-a" "Hello World"))
(.get)))
(record topic value)(record topic key value)(record topic partition key value)Return a record that can be published to Kafka using send.
Return a record that can be published to Kafka using [[send]].
(send producer record)(send producer record callback)Asynchronously send a record to Kafka. Returns a Future of a map
with :topic, :partition and :offset keys. Optionally provide
a callback fn that will be called when the operation completes.
Callback should be a fn of two arguments, a map as above, and an
exception. Exception will be nil if operation succeeded.
Usage:
;;To send the message asynchronously and return a Future (send producer (record "topic-a" "Test message 1")) ;; => #object[string representation of future object]
;;To send message synchronously, deref the returned Future @(send producer (record "topic-a" "Test message 2")) ;; => {:topic "topic-a", :partition 4, :offset 0}
;;To send the message asynchronously and provide a callback ;;returns the future. (send producer (record "topic-a" "Test message 3") #(println "Metadata->" %1 "Exception->" %2)) ;; => #object[string representation of future object] ;; Metadata-> {:topic topic-unknown, :partition 4, :offset 1} Exception-> nil
Asynchronously send a record to Kafka. Returns a `Future` of a map
with `:topic`, `:partition` and `:offset` keys. Optionally provide
a callback fn that will be called when the operation completes.
Callback should be a fn of two arguments, a map as above, and an
exception. Exception will be nil if operation succeeded.
See: http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord)
Usage:
;;To send the message asynchronously and return a Future
(send producer (record "topic-a" "Test message 1"))
;; => #object[string representation of future object]
;;To send message synchronously, deref the returned Future
@(send producer (record "topic-a" "Test message 2"))
;; => {:topic "topic-a", :partition 4, :offset 0}
;;To send the message asynchronously and provide a callback
;;returns the future.
(send producer (record "topic-a" "Test message 3") #(println "Metadata->" %1 "Exception->" %2))
;; => #object[string representation of future object]
;; Metadata-> {:topic topic-unknown, :partition 4, :offset 1} Exception-> nil
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |