Liking cljdoc? Tell your friends :D


Build Status Coverage Status Clojars Project cljdoc badge

A Clojure Apache Kafka client with core.async api

[com.appsflyer/ketu "1.0.0"]


  • Channels API: Take kafka data from a channel and send data to kafka through a channel.
  • Consumer Source: Polls records from kafka and puts them on a channel.
  • Producer Sink: Takes records from a channel and sends them to kafka.
  • Shapes: Transform the original objects of the java client to clojure data and back.
  • Simple Configuration: Friendly, validated configuration.

Minimal Example

Consume a name string from kafka and produce a greeting string for that name back into kafka, all through channels:

(ns example
  (:require [clojure.core.async :refer [chan close! <!! >!!]]
            [ketu.async.source :as source]
            [ketu.async.sink :as sink]))
(let [<names (chan 10)
      source-opts {:name "greeter-consumer"
                   :brokers "broker1:9092"
                   :topic "names"
                   :group-id "greeter"
                   :value-type :string
                   :shape :value}
      source (source/source <names source-opts)

      >greets (chan 10)
      sink-opts {:name "greeter-producer"
                 :brokers "broker2:9091"
                 :topic "greetings"
                 :value-type :string
                 :shape :value}
      sink (sink/sink >greets sink-opts)]

  ;; Consume a name and produce a greeting. You could also do this with e.g. clojure.core.async/pipeline.
  (->> (<!! <names)
       (str "Hi, ")
       (>!! >greets))

  ;; Close the source. It automatically closes the source channel `<names`.
  (source/stop! source)
  ;; Close the sink channel `>greets`. It causes the sink to close itself as a consequence.
  (close! >greets))

Configuration reference

Anything that is not documented is not supported and might change.

Read more about the default values used by the underlying Kafka clients v3.3.1 here

Note: int is used for brevity but can also mean long. Don't worry about it.

Common options (both source and sink accept these)

:brokersstringrequiredComma separated host:port values e.g "broker1:9092,broker2:9092"
:namestringrequiredSimple human-readable identifier, used in logs and thread names
:key-type:string,:byte-arrayoptionalDefault :byte-array, used in configuring key serializer/deserializer
:value-type:string,:byte-arrayoptionalDefault :byte-array, used in configuring value serializer/deserializer
:internal-configmapoptionalA map of the underlying java client properties, for any extra lower level config

Consumer-source options

:shape:value:, [:vector <fields>],[:map <fields>], or an arity-1 function of ConsumerRecordoptionalIf unspecified, channel will contain ConsumerRecord objects. Examples

Producer-sink options

:shape:value, [:vector <fields>],[:map <fields>], or an arity-1 function of the input returning ProducerRecordoptionalIf unspecified, you must put ProducerRecord objects on the channel. Examples
:compression-type"none" "gzip" "snappy" "lz4" "zstd"optionalDefault "none", values are same as "compression.type" of the java producer
:workersintoptionalDefault 1, number of threads that take from the channel and invoke the internal producer

Data shapes

You don't have to deal with ConsumerRecord or ProducerRecord objects.
To get a clojure data structure with any of the ConsumerRecord fields, configure the consumer shape:

; Value only:
{:topic "names"
 :key-type :string
 :value-type :string
 :shape :value}
(<!! consumer-chan)
;=> "v"

; Vector:
{:shape [:vector :key :value :topic]}
(<!! consumer-chan)
;=> ["k" "v" "names"]

; Map
{:shape [:map :key :value :topic]}
(<!! consumer-chan)
;=> {:key "k", :value "v", :topic "names"}

Similarly, to put a clojure data structure on the producer channel:

; Value only:
{:key-type :string
 :value-type :string
 :shape :value}
(>!! producer-chan "v")

; Vector:
{:shape [:vector :key :value]}
(>!! producer-chan ["k" "v"])

; Vector with topic in each message:
{:shape [:vector :key :value :topic]}
(>!! producer-chan ["k1" "v1" "names"])
(>!! producer-chan ["k2" "v2" "events"])

Development & Contribution

We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests, or contact us at clojurians slack.

Can you improve this documentation? These fine people already did:
Yonatan Elhanan, assafadato & Elad Leev
Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close