Liking cljdoc? Tell your friends :D

clj-kafka-x

A Clojure library for the Apache Kafka stream-processing software platform.

This library is NOT compatible with Kafka Clusters below version 0.10. Use the clj-kafka for Kafka Clusters version 0.8 and 0.9 and kafkian for 0.9 and, may be, 0.10.0.0. It is mainly developed for 0.10.0.1 and 0.10.1.0.

clj kafka x

Installation

Add the following to your Leiningen’s project.clj:

[net.tbt-post/clj-kafka-x "0.3.1"]

Usage

Producer

(require '[clj-kafka-x.producer :as kp])

(with-open [p (kp/producer {"bootstrap.servers" "localhost:9092"}
                           (kp/string-serializer)
                           (kp/string-serializer))]
  @(kp/send p (kp/record "topic-a" "Hi there!")))

Consumer

(require '[clj-kafka-x.consumers.simple :as kc])

(with-open [c (kc/consumer {"bootstrap.servers" "localhost:9092"
                            "group.id" "consumer-id"}
                            (kc/string-deserializer)
                            (kc/string-deserializer))]
  (kc/subscribe c "topic-a")
  (kc/messages c))
When you use multiple partitions per topic it is required to specify them explicitly when subscribing, i.e. (kc/subscribe c [{:topic "topic-a" :partitions #{0 1}} {:topic "topic-b" :partitions #{0 1 2}}])
Real-life (almost) example
(ns buzz.consumer.kafka
  (:require [clj-kafka-x.consumers.simple :as kc]
            [clojure.tools.logging :as log]))

(defn processor [msg schema] msg)
(def schema nil)
(def config {"bootstrap.servers" "localhost:9092"
             "group.id" "consumer-id"})

(defn process-message [msg]
  (let [{:keys [value topic partition offset]} msg
        processor processor ;; choose one by topic name
        schema schema]      ;; choose one by topic name
    (if (fn? processor) (processor value schema) value)))

(defn consume []
  (with-open [c (kc/consumer config
                             (kc/byte-array-deserializer)
                             (kc/byte-array-deserializer))]
    (kc/subscribe c (config/kafka-topics))
    (let [pool (kc/messages c)]
      (doseq [message pool]
        (log/warn (process-message message))))))

Admin

Some code may not work!
(require '[clj-kafka-x.admin :as ka])

(def zk-util (ka/zk-utils "localhost:2181"))

(ka/create-topic zk-util "events")

(ka/create-topic zk-util "events2" :partition 3 :replication-factor 3)

(ka/create-topic zk-util "events-store" :topic-conifg {"cleanup.policy" "compact"})

(ka/topic-exists? zk-util "events")

(ka/delete-topic zk-util "events2")

Manual Build

$ lein install

License

Copyright © 2016-2018

Distributed under the Apache License v 2.0

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close