A Clojure library designed to provide a simple interface to consume from Kafka for debugging purposes. For example, you can quickly retrieve records for the queries like
clojure.core.async
to communicate back the kafka records.Add the following to the dependencies section of project.clj
[io.github.ashwinbhaskar/kafka-util "0.1.1"]
(:require [kafka-util.core :as ku]
[clojure.core.async :refer [thread chan >!! <!! close! timeout]])
(def consumer-settings {:broker "localhost"
:port 9092
:security-protocol "PLAIN_TEXT"
:decode-value-as-json true
:key-deserializer :string})
(defn process-records
[records]
(->> records
(run! (fn [{:keys [value partition offset topic headers]}]
(println value)))))
(comment
(let [topic "my-topic"
channel (timeout 1000000)
minutes-ago 60
partition 2
offset 564646
total-partitions 8]
(thread
(ku/consume-records-latest consumer-settings topic channel)
(ku/consume-records-latest consumer-settings topic channel partition)
(ku/consume-records-minutes-ago consumer-settings topic channel minutes-ago)
(ku/consume-records-minutes-ago consumer-settings topic channel minutes-ago partition)
(ku/consume-records-offset consumer-settings topic channel partition offset)
(ku/compute-partition (.getBytes "my-key") total-partitions))
(loop []
(if-let [records (<!! channel)]
(do
(process-records records)
(recur))
(println "Channel is closed!")))))
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close