A Clojure Apache Kafka client with core.async api
[com.appsflyer/ketu "0.6.0"]
Produce a name string into kafka and read the same string from kafka, all through channels:
(defn produce-and-consume-example []
(let [source-chan (async/chan 10)
source-opts {:name "names-consumer"
:brokers "broker1:9092"
:topic "names"
:group-id "names-consumer-group"
:value-type :string
:shape :value}
source (source/source source-chan source-opts)
sink-opts {:name "names-producer"
:brokers "broker1:9091"
:topic "names"
:value-type :string
:shape :value}
sink-chan (async/chan 10)
sink (sink/sink sink-chan sink-opts)]
;; Produce a name to kafka
(async/>!! sink-chan "bob")
;; Consume a name from kafka
(async/<!! source-chan)
;; Close the source. It will close the source-chan automatically.
(source/stop! source)
;; Close the sink-chan. It will automatically close the sink.
(async/close! sink-chan)))
Anything that is not documented is not supported and might change.
Note: int
is used for brevity but can also mean long
. Don't worry about it.
Key | Type | Req? | Notes |
---|---|---|---|
:brokers | string | required | Comma separated host:port values e.g "broker1:9092,broker2:9092" |
:topic | string | required | |
:name | string | required | Simple human-readable identifier, used in logs and thread names |
:key-type | :string ,:byte-array | optional | Default :byte-array , used in configuring key serializer/deserializer |
:value-type | :string ,:byte-array | optional | Default :byte-array , used in configuring value serializer/deserializer |
:internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |
Key | Type | Req? | Notes |
---|---|---|---|
:group-id | string | required | |
:shape | :value: , [:vector <fields>] ,[:map <fields>] , or an arity-1 function of ConsumerRecord | optional | If unspecified, channel will contain ConsumerRecord objects. Examples |
Key | Type | Req? | Notes |
---|---|---|---|
:shape | :value , [:vector <fields>] ,[:map <fields>] , or an arity-1 function of the input returning ProducerRecord | optional | If unspecified, you must put ProducerRecord objects on the channel. Examples |
:compression-type | "none" "gzip" "snappy" "lz4" "zstd" | optional | Default "none" , values are same as "compression.type" of the java producer |
:workers | int | optional | Default 1 , number of threads that take from the channel and invoke the internal producer |
You don't have to deal with ConsumerRecord or ProducerRecord objects.
To get a clojure data structure with any of the ConsumerRecord fields, configure the consumer shape:
; Value only:
{:topic "names"
:key-type :string
:value-type :string
:shape :value}
(<!! consumer-chan)
;=> "v"
; Vector:
{:shape [:vector :key :value :topic]}
(<!! consumer-chan)
;=> ["k" "v" "names"]
; Map
{:shape [:map :key :value :topic]}
(<!! consumer-chan)
;=> {:key "k", :value "v", :topic "names"}
Similarly, to put a clojure data structure on the producer channel:
; Value only:
{:key-type :string
:value-type :string
:shape :value}
(>!! producer-chan "v")
; Vector:
{:shape [:vector :key :value]}
(>!! producer-chan ["k" "v"])
; Vector with topic in each message:
{:shape [:vector :key :value :topic]}
(>!! producer-chan ["k1" "v1" "names"])
(>!! producer-chan ["k2" "v2" "events"])
We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests, or contact us at clojurians slack.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close