(require '[kafka-metamorphosis.admin :as admin])
;; Create an admin client
(def admin-client (admin/create-admin-client {:bootstrap-servers "localhost:9092"}))
;; Create a simple topic
(admin/create-topic! admin-client "my-topic")
;; Create a topic with specific configurations
(admin/create-topic! admin-client "advanced-topic"
{:partitions 5
:replication-factor 1
:configs {"cleanup.policy" "compact"
"retention.ms" "86400000"}})
;; Check if a topic exists
(admin/topic-exists? admin-client "my-topic")
;; Create only if it doesn't exist
(admin/create-topic-if-not-exists! admin-client "safe-topic")
;; List all topics
(admin/list-topics admin-client)
;; Get topic details
(admin/describe-topic admin-client "my-topic")
;; Close the client
(admin/close! admin-client)
(require '[kafka-metamorphosis.dev :as dev])
;; Create a development topic (3 partitions, replication 1)
(dev/setup-dev-topic "dev-topic")
;; Create with custom options
(dev/setup-dev-topic "custom-topic" {:partitions 10})
;; List all topics
(dev/list-all-topics)
;; Describe a topic
(dev/describe-dev-topic "dev-topic")
;; Delete a topic (with confirmation)
(dev/delete-dev-topic "dev-topic")
;; View cluster information
(dev/cluster-info)
If you prefer to use native Kafka tools:
# Create a topic
kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092
# Describe a topic
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
# Delete a topic
kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
(admin/create-topic! admin-client "logs-topic"
{:partitions 6
:configs {"retention.ms" "604800000" ; 7 days
"cleanup.policy" "delete"
"segment.ms" "86400000"}}) ; 1 day
(admin/create-topic! admin-client "state-topic"
{:partitions 3
:configs {"cleanup.policy" "compact"
"min.cleanable.dirty.ratio" "0.1"
"delete.retention.ms" "86400000"}})
(admin/create-topic! admin-client "high-throughput-topic"
{:partitions 12
:configs {"batch.size" "65536"
"linger.ms" "5"
"compression.type" "snappy"}})
cleanup.policy=compact
for state/unique key topicsretention.ms
based on business requirementscompression.type=snappy
or lz4
to save space(require '[kafka-metamorphosis.admin :as admin]
'[kafka-metamorphosis.producer :as producer]
'[kafka-metamorphosis.consumer :as consumer])
;; 1. Create admin client
(def admin-client (admin/create-admin-client {:bootstrap-servers "localhost:9092"}))
;; 2. Create topic if it doesn't exist
(admin/create-topic-if-not-exists! admin-client "complete-example"
{:partitions 3})
;; 3. Create producer and send message
(def p (producer/create {:bootstrap-servers "localhost:9092"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"}))
(producer/send! p "complete-example" "key1" "Hello, Kafka!")
;; 4. Create consumer and read message
(def c (consumer/create {:bootstrap-servers "localhost:9092"
:group-id "example-group"
:key-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
:value-deserializer "org.apache.kafka.common.serialization.StringDeserializer"}))
(consumer/subscribe! c ["complete-example"])
(let [records (consumer/poll! c 5000)]
(doseq [record records]
(println "Received:" (:key record) "->" (:value record))))
;; 5. Clean up resources
(producer/close! p)
(consumer/close! c)
(admin/close! admin-client)
With these functions, you can easily manage Kafka topics using an idiomatic Clojure interface, transforming administration complexity into functional simplicity.
Can you improve this documentation? These fine people already did:
Caio Henrique & Caio ClavicoEdit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
Ctrl+k | Jump to recent docs |
← | Move to previous article |
→ | Move to next article |
Ctrl+/ | Jump to the search field |