[net.tbt-post/clj-kafka-x "0.3.1"]
Add the following to your Leiningen’s
[net.tbt-post/clj-kafka-x "0.3.1"]
(require '[clj-kafka-x.producer :as kp])
(with-open [p (kp/producer {"bootstrap.servers" "localhost:9092"}
@(kp/send p (kp/record "topic-a" "Hi there!")))
(require '[clj-kafka-x.consumers.simple :as kc])
(with-open [c (kc/consumer {"bootstrap.servers" "localhost:9092"
"group.id" "consumer-id"}
(kc/subscribe c "topic-a")
(kc/messages c))
When you use multiple partitions per topic it is required
to specify them explicitly when subscribing, i.e.
c [{:topic "topic-a" :partitions #{0 1}}
{:topic "topic-b" :partitions #{0 1 2}}])
(ns buzz.consumer.kafka
(:require [clj-kafka-x.consumers.simple :as kc]
[clojure.tools.logging :as log]))
(defn processor [msg schema] msg)
(def schema nil)
(def config {"bootstrap.servers" "localhost:9092"
"group.id" "consumer-id"})
(defn process-message [msg]
(let [{:keys [value topic partition offset]} msg
processor processor ;; choose one by topic name
schema schema] ;; choose one by topic name
(if (fn? processor) (processor value schema) value)))
(defn consume []
(with-open [c (kc/consumer config
(kc/subscribe c (config/kafka-topics))
(let [pool (kc/messages c)]
(doseq [message pool]
(log/warn (process-message message))))))
Some code may not work! |
(require '[clj-kafka-x.admin :as ka])
(def zk-util (ka/zk-utils "localhost:2181"))
(ka/create-topic zk-util "events")
(ka/create-topic zk-util "events2" :partition 3 :replication-factor 3)
(ka/create-topic zk-util "events-store" :topic-conifg {"cleanup.policy" "compact"})
(ka/topic-exists? zk-util "events")
(ka/delete-topic zk-util "events2")
$ lein install
Copyright © 2016-2018
Distributed under the Apache License v 2.0
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close