This Apache Kafka client library is a Clojure wrapper for the official java libraries. It strives for a balance between being idiomatic but not too clever. Users used to the java libraries will be right at home, although it is not a prerequisite.
It provides namespaces for handling consumers, producers, and doing some administration. Also, we have the pleasure to announce that Kafka Streams is fully supported.
Ready for Kafka 2.0.0.
First, read the fairly detailed API. Specially if you are not used to the java libraries and their various concepts.
Then, have a look at the following examples. Just so we are prepared, let us require all namespaces involved.
;; For Kafka :
(require '[dvlopt.kafka :as K]
'[dvlopt.kafka.admin :as K.admin]
'[dvlopt.kafka.in :as K.in]
'[dvlopt.kafka.out :as K.out])
;; For Kafka Streams :
(require '[dvlopt.kstreams :as KS]
'[dvlopt.kstreams.topology :as KS.topology]
'[dvlopt.kstreams.ctx :as KS.ctx]
'[dvlopt.kstreams.store :as KS.store]
'[dvlopt.kstreams.builder :as KS.builder]
'[dvlopt.kstreams.stream :as KS.stream]
'[dvlopt.kstreams.table :as KS.table])
Creating topic "my-topic" using the dvlopt.kafka.admin
namespace.
(with-open [admin (K.admin/admin)]
(K.admin/create-topics admin
{"my-topic" {::K.admin/number-of-partitions 4
::K.admin/replication-factor 3
::K.admin/configuration {"cleanup.policy" "compact"}}})
(println "Existing topics : " (keys @(K.admin/topics admin
{::K/internal? false}))))
Sending 25 records to "my-topic" using the dvlopt.kafka.out
namespace.
(with-open [producer (K.out/producer {::K/nodes [["localhost" 9092]]
::K/serializer.key (K/serializers :long)
::K/serializer.value :long
::K.out/configuration {"client.id" "my-producer"}})]
(doseq [i (range 25)]
(K.out/send producer
{::K/topic "my-topic"
::K/key i
::K/value (* 100 i)}
(fn callback [exception metadata]
(println (format "Record %d : %s"
i
(if exception
"FAILURE"
"SUCCESS")))))))
Reading a batch of records from "my-topic" and manually commit the offset of
where we are using the dvlopt.kafka.in
namespace.
(with-open [consumer (K.in/consumer {::K/nodes [["localhost" 9092]]
::K/deserializer.key :long
::K/deserializer.value :long
::K.in/configuration {"auto.offset.reset" "earliest"
"enable.auto.commit" false
"group.id" "my-group"}})]
(K.in/register-for consumer
["my-topic"])
(doseq [record (K.in/poll consumer
{::K/timeout [5 :seconds]})]
(println (format "Record %d @%d - Key = %d, Value = %d"
(::K/offset record)
(::K/timestamp record)
(::K/key record)
(::K/value record))))
(K.in/commit-offsets consumer))
Useless but simple example of grouping records in two categories based on their key, "odd" and "even", and continuously summing values in each category.
First, we create a topology. We then add a source node fetching records from "my-topic". Those records are processed by "my-processor" which needs "my-store" in order to persist the current sum for each category. Finally, a sink node receives processed records and sends them to "my-topic-2".
(def topology
(-> (KS.topology/topology)
(KS.topology/add-source "my-source"
["my-topic"]
{::K/deserializer.key :long
::K/deserializer.value :long
::KS/offset-reset :earliest})
(KS.topology/add-processor "my-processor"
["my-source"]
{::KS/processor.init (fn [ctx]
(KS.ctx/kv-store ctx
"my-store"))
::KS/processor.on-record (fn [ctx my-store record]
(println "Processing record : " record)
(let [key' (if (odd? (::K/key record))
"odd"
"even")
sum (+ (or (KS.store/kv-get my-store
key')
0)
(::K/value record))]
(KS.store/kv-put my-store
key'
sum)
(KS.ctx/forward ctx
{::K/key key'
::K/value sum})))})
(KS.topology/add-store ["my-processor"]
{::K/deserializer.key :string
::K/deserializer.value :long
::K/serializer.key :string
::K/serializer.value :long
::KS.store/name "my-store"
::KS.store/type :kv.in-memory
::KS.store/cache? false})
(KS.topology/add-sink "my-sink"
["my-processor"]
"my-topic-2"
{::K/serializer.key :string
::K/serializer.value :long})))
(def app
(KS/app "my-app-1"
topology
{::K/nodes [["localhost" 9092]]
::KS/on-exception (fn [exception _thread]
(println "Exception : " exception))}))
(KS/start app)
Same example as previously but in a more functional style. In addition, values are aggregated in 2 seconds windows (it is best to run the producer example a few times first).
First, we need a builder. Then, we add a stream fetching records from "my-topic". Records are then grouped into our categories and then each category is windowed in 2 seconds windows. Each window is then reduced for computing a sum. Then, we are ready and we can build a topology out of our builder. It is always a good idea to have a look at the description of the built topology to have a better idea of what is created by the high-level API.
A window store is then retrieved and each window for each category is printed.
(def topology
(let [builder (KS.builder/builder)]
(-> builder
(KS.builder/stream ["my-topic"]
{::K/deserializer.key :long
::K/deserializer.value :long
::KS/offset-reset :earliest})
(KS.stream/group-by (fn [k v]
(println (format "Grouping [%d %d]"
k
v))
(if (odd? k)
"odd"
"even"))
{::K/deserializer.key :string
::K/deserializer.value :long
::K/serializer.key :string
::K/serializer.value :long})
(KS.stream/window [2 :seconds])
(KS.stream/reduce-windows (fn reduce-window [sum k v]
(println (format "Adding value %d to sum %s for key '%s'"
v
sum
k))
(+ sum
v))
(fn seed []
0)
{::K/deserializer.key :string
::K/deserializer.value :long
::K/serializer.key :string
::K/serializer.value :long
::KS.store/name "my-store"
::KS.store/type :kv.in-memory
::KS.store/cache? false}))
(KS.topology/topology builder)))
;; Always interesting to see what is the actual topology.
(KS.topology/describe topology)
(def app
(KS/app "my-app-2"
topology
{::K/nodes [["localhost" 9092]]
::KS/on-exception (fn [exception _thread]
(println "Exception : " exception))}))
(KS/start app)
(def my-store
(KS/window-store app
"my-store"))
(with-open [cursor (KS.store/ws-multi-range my-store)]
(doseq [db-record (iterator-seq cursor)]
(println (format "Aggregated key = '%s', time windows = [%d;%d), value = %d"
(::K/key db-record)
(::K/timestamp.from db-record)
(::K/timestamp.to db-record)
(::K/value db-record)))))
Copyright © 2017-2018 Adam Helinski
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close