[net.tbt-post/clj-kafka-x "0.9.0"]
A Clojure library for the Apache Kafka (distributed stream-processing software platform).
Uses KF protocol and does not rely on ZooKeeper.
Tries to be as lightweight as possible thus depends only on
org.apache.kafka/kafka-clients "4.2.0"
| The Zookeeper dependency has been removed as it’s not required by modern Kafka clients for standard operations. |
Some builds (for instance of v0.4.x branch) may partially (sometimes even fully) be incompatible with some versions of other libraries that also use NIO! If you’re experiencing build problems and/or your application is unexpectedly crashed on start - try check your project dependencies more deeply, may be you will need to correct existing dependencies version or to add an actual version of full [io.netty/netty-all]
|
Actual library info:
Add the following to your Leiningen’s
project.clj:
[net.tbt-post/clj-kafka-x "0.9.0"]
(require '[clj-kafka-x.producer :as kp])
(with-open [p (kp/producer {"bootstrap.servers" "localhost:9092"}
(kp/string-serializer)
(kp/string-serializer))]
@(kp/send p (kp/record "topic-a" "Hi there!")))
(require '[clj-kafka-x.consumers.simple :as kc])
(with-open [c (kc/consumer {"bootstrap.servers" "localhost:9092"
"group.id" "consumer-id"}
(kc/string-deserializer)
(kc/string-deserializer))]
(kc/subscribe c "topic-a")
(kc/messages c))
Share consumers (KIP-932) allow multiple consumers to read from the same partition concurrently, solving the Head of Line Blocking problem.
(require '[clj-kafka-x.consumers.shared :as ks])
(with-open [c (ks/share-consumer {"bootstrap.servers" "localhost:9092"
"group.id" "my-share-group"}
(ks/string-deserializer)
(ks/string-deserializer))]
(ks/subscribe c "topic-a")
(let [msgs (ks/messages c)]
(doseq [msg msgs]
(ks/acknowledge c msg))
(ks/commit-sync c)
msgs))
Records can be acknowledged with :accept (default), :release, :reject, or :renew:
(ks/acknowledge c msg :reject) ;; reject permanently
(ks/acknowledge c msg :release) ;; release for redelivery
When you use multiple partitions per topic it is required
to specify them explicitly when subscribing, i.e.
(kc/subscribe
c [{:topic "topic-a" :partitions #{0 1}}
{:topic "topic-b" :partitions #{0 1 2}}])
|
(ns buzz.consumer.kafka
(:require [clj-kafka-x.consumers.simple :as kc]
[clojure.tools.logging :as log]))
(defn processor [msg schema] msg)
(def schema nil)
(def config {"bootstrap.servers" "localhost:9092"
"group.id" "consumer-id"})
(defn process-message [msg]
(let [{:keys [value topic partition offset]} msg
processor processor ;; choose one by topic name
schema schema] ;; choose one by topic name
(if (fn? processor) (processor value schema) value)))
(defn consume []
(with-open [c (kc/consumer config
(kc/byte-array-deserializer)
(kc/byte-array-deserializer))]
(kc/subscribe c (config/kafka-topics))
(let [pool (kc/messages c)]
(doseq [message pool]
(log/warn (process-message message))))))
you may also use specific timeouts form
(defn- consume [instance process-message]
(when-let [co (kc/consumer config
(kc/byte-array-deserializer)
(kc/byte-array-deserializer))
messages (kc/messages
co
:timeout (:request-timeout-ms config))]
(doall (map process-message messages))))
message count per poll execution may be specified by max.poll.records field of configuration
$ lein install
The project includes a comprehensive test suite with ~82% code coverage.
$ lein with-profile dev test
For test coverage reports:
$ lein with-profile dev cloverage
Current coverage:
| Namespace | Forms | Lines |
|---|---|---|
clj-kafka-x.consumers.shared | 81.60% | 89.09% |
clj-kafka-x.consumers.simple | 80.65% | 83.13% |
clj-kafka-x.data | 81.95% | 83.95% |
clj-kafka-x.impl.helpers | 87.90% | 93.55% |
clj-kafka-x.producer | 76.09% | 81.82% |
Total | 81.70% | 85.51% |
The project includes integration tests that require a running Kafka instance. By default, these tests are skipped. To run them:
(binding [clj-kafka-x.integration-test/*run-integration-tests* true
clj-kafka-x.integration-test/*kafka-bootstrap-servers* "localhost:9092"]
(clj-kafka-x.integration-test/run-integration-tests))
See CHANGELOG.adoc for version history.
Copyright © 2016-2026
Distributed under the Apache License v 2.0
Can you improve this documentation? These fine people already did:
source-c, AI, A I & MelKoriEdit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |