Felice is client library for Apache Kafka in Clojure.

Built with simplicity it mind, it support by default JSON, Transit & Nippy (Fast | LZ4) and provide also custom Serializer / Deserializer mechanism

Note: the version contains in a first time the Kafka Client then the felice version, separated by hyphen.


Json safe:json-safe
Transit MessagePack:t+mpack
Transit Json:t+json
Nippy Fast:nippy+fast
Nippy LZ4:nippy+lz4

Beware that any exception during the deserialization process (eg: malformed json) will be thrown by the poll call. This may result in a silent dead poll-loop.

Using a safe deserializer is an option, it will return a map containing the raw value (as a string) and a :felice.serialization/error key containing the exception instead of the deserialized value of the record.

Producing records

(require '[felice.producer :as fp])

(let [producer (fp/producer {:bootstrap.servers "localhost:9092" ;required
                             :key.serializer :string             ;required
                             :value.serializer :t+json           ;required
  ;; publish without key
  (fp/send! producer "topic1" "value")
  ;; publish with a key
  (fp/send! producer "topic2" "key" "value")
  ;; map syntax
  (fp/send! producer {:topic "topic3" :key "key" :value "value"})
  ;; remember to close you producer
  (fp/close! producer))

Consuming records

(require '[felice.consumer :as fc])

(defn print-record [{:keys [topic partition offset timestamp key value]}]
   (println (format "Record %s(%d) %d @%d - Key = %s, Value = %s"
                    topic partition offset timestamp key value)))

(def consumer-cfg {:bootstrap.servers "localhost:9092" ;required
           "my-group"               ;required
                    :auto.offset.reset "latest"        ;or "earliest", used at first startup only
                    :key.serializer    :string         ;required
                    :value.serializer  :json           ;required
           5000      ;delay between auto commits
                    :topics #{"topic1" "topic2"}       ;auto subscribes at startup

Do it yourself

(let [consumer (fc/consumer consumer-cfg)]

;; subscribe can take multiple topics at once
  (fc/subscribe consumer "topic1" "topic2")
;; and keep your previous subscriptions
  (fc/subscribe consumer "topic3")

;; basic polling
  (let [cr (fc/poll consumer 100)
        records (fc/consumer-records->all-records cr)]
    (doseq [record records] (print-record record))
    (fc/close! consumer))

Using poll-loop or poll-loops

;; poll-loop
  (let [stop-fn (fc/poll-loop consumer-cfg print-record)]
    ;; consumes in a future thread
    ;; call the returned fn when you want to stop polling
;; poll-loops
  (let [stop-fn (fc/poll-loops consumer-cfg print-record {:threads-by-topic 4})]
    ;; spawns 4 consumers by registered topic, each in its own future thread
    ;; call the returned fn when you want to stop polling

Commit policy

  • :never does nothing (use it if you enabled client auto commit)
  • :poll commit last read offset after processing all the items of a poll
  • :record commit the offset of every processed record


You can set either :threads-by-topic or :threads option (if both are set, :threads-by-topic will win)

  • :threads spawn N threads total (each thread listening all registered topic)
  • :threads-by-topic spawn N threads for each registered topic
  • you can also provide a map {:topic :threads} instead of a list of topics


;; Commit after each record processed spawning 8 threads (4 for topic1 and 4 for topic2)
(fc/poll-loops consumer-cfg print-record ["topic1" "topic2"] {:commit-policy :record :threads-by-topic 4})
;; No committing 1 thread for topic1 and 4 for topic2
(fc/poll-loops consumer-cfg print-record {"topic1" 1 "topic2" 4} {:commit-policy :never})


The partitionner used by a producer can be set using this configuration key.

See default partitionner implementation

There is a comment at the bottom of the felice.producer namespace mimiking the default partitionner if you want to see the result for some keys.

;; default partitionner
(import 'org.apache.kafka.common.utils.Utils)
(defn partition-from-bytes [partition-count bytes]
  (mod (Utils/toPositive (Utils/murmur2 bytes)) partition-count))
(defn partition-from-string [partition-count string]
  (partition-from-bytes partition-count (.getBytes string)))


Copyright © 2018 - 2022 Oscaro

This program and the accompanying materials are made available under the terms of the Eclipse Public License 2.0 which is available at

This Source Code may also be made available under the following Secondary Licenses when the conditions for such availability set forth in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version, with the GNU Classpath Exception which is available at

