Utilities to make querying the data in kafka topics easier.
[kafka.utils "0.0.1"]
as a dependency.;; require kafka-utils and datascript
(require '[kafka-utils.core :as ku]
'[datascript.core :as d])
;; Get a core.async channel of all the records in a topic.
;; For the serdes, I'd recommend using franzy, but this isn't required
(def channel
(ku/read-from-beginning "backblaze_smart" ["localhost:9092"]
(franzy.deserializers/string-deserializer)
(franzy.json.deserializers/json-deserializer {:key-fn true})))
;; Create a datascript db
(def conn (d/create-conn))
;; Sink the topic data into the datascript db
(ku/sink-to-db! channel db-conn)
kafka.record/key
, kafka.record/timestamp
, kafka.record/offset
, and kafka.record/topic
.Distributed under the GPL V3.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close