The Jackdaw Client API wraps the core Kafka Producer
1 and
Consumer
2 APIs and provides functions for building or
unpacking some of the supporting objects like Callbacks, Serdes, ConsumerRecords etc.
The producer example below demonstrates how to use the Kafka Producer API. The configuration3
is represented as a simple map (Jackdaw will convert this to a Properties
object), and the
key and value serializers are specified when creating the producer to override the default
(which would be the StringSerializer
).
Producers are usually created using the with-open
macro so that they are automatically
closed either when evaluation reaches the end of the body or an exception is thrown.
Within the body, the jc/produce!
function is used to request a write to the specified
Kafka topic. This function returns a delay immediately which can be deref
'd to wait
for the result of the Kafka .send
call which includes metadata like the timestamp
and offset of the written record.
In this example, the JSON serde converts the message from a plain old Clojure map into a
JSON byte-array while building the ProducerRecord that is eventually passed to the
KafkaProducer's .send
method. Other Serdes are available4
(ns producer-example
(:require
[jackdaw.serdes.json :as json]
[jackdaw.client :as jc])
(:import
(org.apache.kafka.common.serialization Serdes)))
(def producer-config
{"bootstrap.servers" "localhost:9092"
"acks" "all"
"client.id" "com.foo.my-producer"})
(with-open [my-producer (jc/producer producer-config (Serdes/IntegerSerde) (json/serde))]
(jc/produce! my-producer "foo" 1 {:id 1, :payload "hi mom!"}))
The consumer example below demonstrates how to use the Kafka Consumer API. The configuration5
is represented as a simple map (Jackdaw will convert this to a Properties
object), and the
key and value serializers are specified when creating the consumer to override the default
(which would be the StringSerializer
).
Consumers are usually created using the with-open
macro so that they are automatically
closed either when evaluation reaches the end of the body or an exception is thrown. In this
example, we subscribe to the "foo" topic immediately after creating the consumer using
jc/subscribe
.
The jackdaw.client.log/log
function takes a consumer instance that has already been subscribed
to one or more topics, and returns a lazy infinite sequence of "datafied" records in the order
that they were received by calls to the Consumer's .poll
method. In this example, we just
write the record to standard out to demonstrate the keys that are available in each record. To
see what other keys are available, see data/consumer.clj6
(ns consumer-example
(:require
[jackdaw.serdes.json :as json]
[jackdaw.client :as jc]
[jackdaw.client.log :as jl])
(:import
(org.apache.kafka.common.serialization Serdes)))
(def consumer-config
{"bootstrap.servers" "localhost:9092"
"group.id" "com.foo.my-consumer"})
(def topic-foo
{:topic-name "foo"})
(with-open [my-consumer (-> (jc/consumer consumer-config (Serdes/IntegerSerde) (json/serde))
(jc/subscribe [topic-foo]))]
(doseq [{:keys [key value partition timestamp offset]} (jl/log my-consumer)]
(println "key: " key)
(println "value: " value)
(println "partition: " partition)
(println "timestamp: " timestamp)
(println "offset: " offset)))
1: https://kafka.apache.org/documentation/#producerapi
2: https://kafka.apache.org/documentation/#consumerapi
3: https://kafka.apache.org/documentation/#producerconfigs
4: https://github.com/FundingCircle/jackdaw/blob/master/src/jackdaw/serdes
5: https://kafka.apache.org/documentation/#consumerconfigs
6: https://github.com/FundingCircle/jackdaw/blob/master/src/jackdaw/data/consumer.clj
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close