You can now define topic-specific schemas using the :topic-name/schema-id
convention:
;; Schema for 'users' topic
(schema/defschema :users/default {:user-id int? :name string?})
(schema/defschema :users/profile {:user-id int? :name string? :bio string?})
;; Direct topic schema
(schema/defschema :orders {:order-id string? :total double?})
;; Auto-detect schema based on topic name
(schema/validate-message-for-topic message "users") ; uses :users/default
(schema/explain-validation-for-topic message "users") ; detailed explanation
Both producer and consumer now support automatic validation through the :schemas
configuration:
;; Producer with auto-validation
(def producer (producer/create {:bootstrap-servers "localhost:9092"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:schemas true})) ; ← ENABLES AUTOMATIC VALIDATION!
;; Consumer with auto-validation
(def consumer (consumer/create {:bootstrap-servers "localhost:9092"
:group-id "my-group"
:key-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
:value-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
:auto-offset-reset "earliest"
:schemas true})) ; ← ENABLES AUTOMATIC VALIDATION!
;; Normal usage - validation happens automatically
(producer/send! producer "users" "key" {:user-id 123 :name "John"})
(consumer/subscribe! consumer ["users"])
(let [records (consumer/poll! consumer 1000)]
(doseq [record records]
(println "Consumed:" (:value record)))) ; Warnings shown for invalid messages
(producer/close! producer)
(consumer/close! consumer)
;; Auto-detect schema based on topic (STRICT - requires schema to exist)
{:schemas true}
;; Specific schema for all topics
{:schemas :my-schema}
;; Per-topic mapping
{:schemas {"users" :users/default
"orders" :orders/default}}
Important: When using {:schemas true}
, validation is strict - if no schema is found for a topic, messages will be rejected with a clear warning.
(require '[kafka-metamorphosis.producer :as producer])
(require '[kafka-metamorphosis.consumer :as consumer])
(require '[kafka-metamorphosis.schema :as schema])
;; 1. Define schemas by topic
(schema/defschema :users/default
{:user-id int?
:name string?
:email string?})
(schema/defschema :orders/default
{:order-id string?
:user-id int?
:total double?
:status (schema/one-of :pending :confirmed :shipped)})
;; 2. Create producer with automatic validation
(def producer (producer/create {:bootstrap-servers "localhost:9092"
:key-serializer "org.apache.kafka.common.serialization.StringSerializer"
:value-serializer "org.apache.kafka.common.serialization.StringSerializer"
:schemas true}))
;; 3. Send messages - shows warnings for invalid data and blocks sending!
(producer/send! producer "users" "user-123"
{:user-id 123 :name "John" :email "john@test.com"})
;; Invalid message - shows warning and returns nil (doesn't send)
(let [result (producer/send! producer "users" "invalid" {:user-id "not-a-number"})]
(if result
(println "Message sent successfully")
(println "Message was rejected due to validation failure")))
;; ⚠️ Schema validation failed for topic 'users'
;; Message will NOT be sent to Kafka.
;; 4. Create consumer with automatic validation
(def consumer (consumer/create {:bootstrap-servers "localhost:9092"
:group-id "my-consumer-group"
:key-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
:value-deserializer "org.apache.kafka.common.serialization.StringDeserializer"
:auto-offset-reset "earliest"
:schemas true}))
;; 5. Consume messages - shows warnings for invalid data and filters them out!
(consumer/subscribe! consumer ["users"])
(let [records (consumer/poll! consumer 5000)]
(doseq [record records]
(println "Consumed:" (:value record))))
;; ⚠️ Schema validation failed for consumed message from topic 'users'
;; Message will NOT be processed.
;; Only valid messages will be returned in the poll! result
(consumer/close! consumer)
(producer/close! producer)
;; List all schemas
(schema/list-schemas)
;; => (:users/default :orders/default :notifications ...)
;; Schemas for specific topic
(schema/list-schemas-for-topic "users")
;; => (:users/default :users/profile)
;; Get schema for topic
(schema/get-schema-for-topic "users") ; searches :users/default or :users
(get-schema-for-topic topic-name)
- Get schema for topic(list-schemas-for-topic topic-name)
- List topic's schemas(validate-message-for-topic message topic)
- Validate against topic schema(explain-validation-for-topic message topic)
- Detailed explanation(producer/create config)
- Supports :schemas
in configuration(producer/send! producer topic key value)
- Automatic validation with rejection of invalid messages(producer/send-async! producer topic key value callback)
- Automatic validation with rejection of invalid messages(consumer/create config)
- Supports :schemas
in configuration(consumer/poll! consumer timeout)
- Automatic validation with filtering of invalid messages{:schemas true}
- Auto-detect based on topic name (strict - requires schema){:schemas :schema-id}
- Specific schema for all topics{:schemas {"topic1" :schema1 "topic2" :schema2}}
- Per-topic mapping (strict - requires mapping)When schema validation is enabled, the system enforces strict requirements:
{:schemas true}
):topic/default
or :topic
){:schemas {...}}
);; Schema mapping with only specific topics
(def producer (producer/create {:schemas {"users" :users/default
"orders" :orders/default}}))
;; ✅ Will succeed - topic has mapping and valid data
(producer/send! producer "users" {:user-id 123 :name "John"})
;; ❌ Will be rejected - topic not in mapping
(producer/send! producer "products" {:id 1 :name "Widget"})
;; ⚠️ No schema mapping found for topic 'products'
;; Available mappings: (users orders)
;; Message will NOT be sent to Kafka.
;; ❌ Will be rejected - topic has mapping but invalid data
(producer/send! producer "users" {:user-id "invalid"})
;; ⚠️ Schema validation failed for topic 'users'
;; Message will NOT be sent to Kafka.
See practical examples in:
src/kafka_metamorphosis/exemples/topic_schema_examples.clj
Run in REPL:
(require '[kafka-metamorphosis.exemples.topic-schema-examples :as examples])
(examples/run-all-examples)
:schemas true
and validation happens automatically:topic-name/default
or :topic-name
(producer/send!)
and (consumer/poll!)
styleSchema validation is now completely transparent, automatic, and protective! 🚀
Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
Ctrl+k | Jump to recent docs |
← | Move to previous article |
→ | Move to next article |
Ctrl+/ | Jump to the search field |