"When Gregor Samsa woke up one morning from unsettling dreams, he found himself changed in his bed into a monstrous vermin."
— Franz Kafka, The Metamorphosis
Kafka Metamorphosis is a comprehensive Clojure wrapper that transforms the Java Kafka APIs into an elegant, idiomatic Clojure interface. Just as Gregor Samsa underwent his transformation, this library metamorphoses complex Java APIs into simple, functional Clojure code that feels natural and powerful.
Add this to your project.clj
dependencies:
Or for deps.edn:
org.clojars.caioclavico/kafka-metamorphosis {:mvn/version "0.3.0"}
For tools.deps CLI:
clj -Sdeps '{:deps {org.clojars.caioclavico/kafka-metamorphosis {:mvn/version "0.3.0"}}}'
📚 Detailed Guides:
The simplest way to get started with Kafka Metamorphosis:
(require '[kafka-metamorphosis.core :as km])
;; Send a simple message
(km/send-message! "my-topic" "Hello, Kafka!")
;; Send a JSON message
(km/send-json-message! "events" {:user-id 123 :action "login"})
;; Consume messages
(take 10 (km/consume-messages! "my-group" ["my-topic"]))
;; Create a topic
(km/create-topic! "new-topic" {:partitions 6})
;; Check cluster health
(km/health-check)
(require '[kafka-metamorphosis.core :as km]
'[kafka-metamorphosis.serializers :as serializers])
;; Producer with JSON serialization
(def producer (km/create-producer
(km/producer-config {:acks "all"}
serializers/simple-json-serializers)))
;; Consumer with Avro deserialization
(def consumer (km/create-consumer
(km/consumer-config "my-group" {}
(serializers/avro-deserializers "http://schema-registry:8081"))))
For more control, use the individual namespaces:
(require '[kafka-metamorphosis.producer :as producer]
'[kafka-metamorphosis.consumer :as consumer]
'[kafka-metamorphosis.admin :as admin])
;; Producer
(def p (producer/create (km/producer-config)))
(producer/send! p "my-topic" "key" "value")
(producer/close! p)
;; Consumer
(def c (consumer/create (km/consumer-config "my-group")))
(consumer/subscribe! c ["my-topic"])
(consumer/poll! c 1000)
(consumer/close! c)
;; Admin operations
(def a (admin/create-admin-client (km/admin-config)))
(admin/create-topic! a "new-topic" {:partitions 3})
(admin/list-topics a)
(admin/close! a)
Ensure message integrity with built-in schema validation:
(require '[kafka-metamorphosis.schema :as schema])
;; Define a schema
(schema/defschema :user-schema
{:user-id int?
:name string?
:email string?
:active boolean?})
;; Validate messages
(schema/validate-message
{:user-id 123 :name "John" :email "john@example.com" :active true}
:user-schema) ; => true
;; Send validated messages
(schema/send-schema-message!
"users"
{:user-id 456 :name "Jane" :email "jane@example.com" :active true}
:user-schema)
;; Consume with validation
(schema/consume-schema-messages! "user-group" ["users"] :user-schema)
Kafka Metamorphosis includes a CLI for quick operations:
# Check cluster health
lein run health
# List topics
lein run topics
# Send a message
lein run send my-topic "Hello, World!"
# Send JSON message
lein run send-json events '{"user": "john", "action": "login"}'
Multiple serialization formats are supported out of the box:
(require '[kafka-metamorphosis.serializers :as serializers])
;; String serialization (default)
serializers/string-serializers
serializers/string-deserializers
;; Simple JSON (no schema registry)
serializers/simple-json-serializers
serializers/simple-json-deserializers
;; Confluent JSON with Schema Registry
serializers/json-serializers
serializers/json-deserializers
;; Avro with Schema Registry
(serializers/avro-serializers "http://schema-registry:8081")
(serializers/avro-deserializers "http://schema-registry:8081")
;; Protobuf with Schema Registry
(serializers/protobuf-serializers "http://schema-registry:8081")
(serializers/protobuf-deserializers "http://schema-registry:8081")
The easiest way to get started is using the built-in Docker development environment:
(require '[kafka-metamorphosis.dev :as dev])
;; KRaft mode (modern, no Zookeeper)
(dev/kafka-setup-kraft!)
;; Traditional mode with Zookeeper
(dev/kafka-setup-zookeeper!)
;; Schema Registry + Kafka + UI
(dev/kafka-setup-full!)
See the Docker Setup Guide for detailed instructions.
# Start REPL
lein repl
# Run tests
lein test
# Build
lein uberjar
# Health check
lein run health
Kafka Metamorphosis uses Clojure maps for configuration, automatically converting them to Java Properties. All standard Kafka configuration options are supported:
;; Producer configuration
(km/producer-config "localhost:9092"
{:acks "all"
:retries 3
:batch-size 16384
:linger-ms 1}
serializers/json-serializers)
;; Consumer configuration
(km/consumer-config "localhost:9092"
"my-group"
{:auto-offset-reset "earliest"
:enable-auto-commit true}
serializers/json-deserializers)
The main entry point with high-level convenience functions:
(require '[kafka-metamorphosis.core :as km])
;; Configuration builders
(km/producer-config)
(km/consumer-config "group-id")
(km/admin-config)
;; High-level messaging
(km/send-message! "topic" "value")
(km/send-json-message! "topic" {:data "value"})
(km/consume-messages! "group" ["topic1" "topic2"])
(km/consume-json-messages! "group" ["topic"])
;; Admin operations
(km/create-topic! "topic" {:partitions 6})
(km/list-topics)
(km/topic-exists? "topic")
(km/health-check)
Serialization configurations for different data formats:
(require '[kafka-metamorphosis.serializers :as ser])
;; Available serializers
ser/string-serializers
ser/simple-json-serializers
ser/json-serializers
(ser/avro-serializers "schema-registry-url")
(ser/protobuf-serializers "schema-registry-url")
;; JSON utilities
(ser/to-json {:key "value"})
(ser/from-json "{\"key\":\"value\"}")
For fine-grained control:
;; Producer operations
(require '[kafka-metamorphosis.producer :as producer])
(producer/create config)
(producer/send! producer topic key value)
(producer/close! producer)
;; Consumer operations
(require '[kafka-metamorphosis.consumer :as consumer])
(consumer/create config)
(consumer/subscribe! consumer topics)
(consumer/poll! consumer timeout)
(consumer/close! consumer)
;; Admin operations
(require '[kafka-metamorphosis.admin :as admin])
(admin/create-admin-client config)
(admin/create-topic! client topic-name opts)
(admin/list-topics client)
(admin/close! client)
(require '[kafka-metamorphosis.core :as km])
;; Send events
(km/send-json-message! "user-events"
{:user-id 123
:action "login"
:timestamp (System/currentTimeMillis)})
;; Process events
(doseq [message (take 100 (km/consume-json-messages "processors" ["user-events"]))]
(let [event (:value message)]
(println "Processing event for user" (:user-id event))))
;; Consume from multiple topics with custom processing
(let [messages (km/consume-messages "analytics"
["page-views" "clicks" "purchases"]
{:max-messages 1000 :timeout-ms 5000})]
(doseq [msg messages]
(case (:topic msg)
"page-views" (process-page-view msg)
"clicks" (process-click msg)
"purchases" (process-purchase msg))))
git checkout -b feature/amazing-feature
)lein test
)Kafka Metamorphosis is organized into focused namespaces:
kafka-metamorphosis/
├── core # High-level API and convenience functions
├── producer # Producer operations and configuration
├── consumer # Consumer operations and configuration
├── admin # Administrative operations
├── serializers # Serialization/deserialization configs
└── dev # Development utilities and Docker setup
This project draws inspiration from Franz Kafka's "The Metamorphosis," where the protagonist undergoes a dramatic transformation. Similarly, this library transforms the Java Kafka API into something more suitable for the Clojure ecosystem - a metamorphosis from imperative to functional, from complex to simple, from verbose to elegant.
Copyright © 2025 Kafka Metamorphosis Contributors
This program and the accompanying materials are made available under the terms of the Eclipse Public License 2.0 which is available at http://www.eclipse.org/legal/epl-2.0.
This Source Code may also be made available under the following Secondary Licenses when the conditions for such availability set forth in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version, with the GNU Classpath Exception which is available at https://www.gnu.org/software/classpath/license.html.
The metamorphosis is complete! 🦋
Transform your Kafka experience with the elegance of Clojure.
Can you improve this documentation? These fine people already did:
Caio Clavico & Caio Henrique Clavico CrizantoEdit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
Ctrl+k | Jump to recent docs |
← | Move to previous article |
→ | Move to next article |
Ctrl+/ | Jump to the search field |