Kafka Metamorphosis provides a powerful and intuitive schema validation system that allows you to define, register, and validate message structures without the complexity of clojure.spec. Our custom implementation offers flexibility, performance, and excellent error reporting.
(require '[kafka-metamorphosis.schema :as schema])
;; Define a simple schema
(schema/defschema :user-schema
{:user-id int?
:name string?
:email string?
:active boolean?})
;; Validate a message
(schema/validate-message
{:user-id 123 :name "John Doe" :email "john@example.com" :active true}
:user-schema) ; => true
;; Get detailed validation errors
(schema/explain-validation
{:user-id "invalid" :name "John"} ; missing email, wrong type
:user-schema)
; => {:valid? false :errors [...]}
(schema/defschema :schema-name
{:field-name predicate-function
:another-field another-predicate})
(schema/defschema :product-schema
{:product-id int?
:name string?
:price double?
:in-stock boolean?
:category string?})
;; Returns true/false
(schema/validate-message message :schema-name)
;; Get detailed explanation
(schema/explain-validation message :schema-name)
;; Success
(schema/validate-message {:user-id 123 :name "John"} :user-schema)
; => true
;; Failure
(schema/validate-message {:user-id "invalid"} :user-schema)
; => false
;; Detailed explanation
(schema/explain-validation {:user-id "invalid"} :user-schema)
; => {:valid? false
; :errors [{:field "user-id"
; :error "Field validation failed"
; :value "invalid"
; :expected #function[clojure.core/int?]}]
; :message {...}
; :schema-id :user-schema}
int? ; Integer numbers
double? ; Double numbers
string? ; Strings
boolean? ; Boolean values
keyword? ; Keywords
map? ; Maps
vector? ; Vectors
seq? ; Sequences
;; Accept one of specific values
(schema/one-of :active :inactive :suspended)
;; Minimum collection size
(schema/min-count 1) ; At least 1 item
(schema/min-count 5) ; At least 5 items
;; Maximum collection size
(schema/max-count 10) ; At most 10 items
;; Map with specific key-value types
(schema/map-of keyword? string?) ; keyword keys, string values
(schema/map-of string? any?) ; string keys, any values
You can create custom validation functions:
;; Email validation
(defn email? [s]
(and (string? s)
(re-matches #".+@.+\..+" s)))
;; Positive number validation
(defn positive? [n]
(and (number? n) (> n 0)))
;; CPF validation (Brazilian tax ID)
(defn valid-cpf? [cpf]
(and (string? cpf)
(re-matches #"\d{3}\.\d{3}\.\d{3}-\d{2}" cpf)))
;; Use in schema
(schema/defschema :customer-schema
{:customer-id int?
:email email? ; custom predicate
:balance positive? ; custom predicate
:cpf valid-cpf? ; custom predicate
:status (schema/one-of :active :inactive)})
Kafka Metamorphosis provides powerful schema composition features that allow you to create complex schemas by combining simpler ones.
Reference other schemas within your schema definitions:
;; Define base schemas
(schema/defschema :user-details
{:id int?
:name string?
:email string?})
(schema/defschema :address-details
{:street string?
:city string?
:zip-code string?})
;; Reference schemas in other schemas
(schema/defschema :user-profile
{:user (schema/schema-ref :user-details)
:address (schema/schema-ref :address-details)
:bio string?
:created-at string?})
;; Usage
(schema/validate-message
{:user {:id 123 :name "John" :email "john@example.com"}
:address {:street "123 Main St" :city "NYC" :zip-code "10001"}
:bio "Software Developer"
:created-at "2024-01-01"}
:user-profile) ; => true
Use any-of
when a field can match any one of several schemas:
;; Define alternative schemas
(schema/defschema :email-contact
{:email string?
:preferred-time string?})
(schema/defschema :phone-contact
{:phone string?
:area-code string?})
(schema/defschema :social-contact
{:platform string?
:handle string?})
;; Field can be any of these contact types
(schema/defschema :user-contact
{:user-id int?
:contact-info (schema/any-of :email-contact :phone-contact :social-contact)
:notes string?})
;; All of these are valid:
(schema/validate-message
{:user-id 123
:contact-info {:email "john@example.com" :preferred-time "morning"}
:notes "Primary contact"}
:user-contact) ; => true
(schema/validate-message
{:user-id 123
:contact-info {:phone "+1-555-1234" :area-code "555"}
:notes "Phone contact"}
:user-contact) ; => true
(schema/validate-message
{:user-id 123
:contact-info {:platform "twitter" :handle "@john_doe"}
:notes "Social media"}
:user-contact) ; => true
Use all-of
when a field must satisfy multiple schemas simultaneously:
;; Define partial schemas
(schema/defschema :user-basic
{:id int?
:name string?})
(schema/defschema :user-contact
{:email string?
:phone string?})
(schema/defschema :user-permissions
{:role string?
:permissions [string?]})
;; Field must satisfy all schemas
(schema/defschema :complete-user
{:user-data (schema/all-of :user-basic :user-contact :user-permissions)
:registration-date string?})
;; This message must have all fields from all three schemas
(schema/validate-message
{:user-data {:id 123
:name "John Doe"
:email "john@example.com"
:phone "+1-555-1234"
:role "admin"
:permissions ["read" "write" "delete"]}
:registration-date "2024-01-01"}
:complete-user) ; => true
;; This would fail (missing permissions fields)
(schema/validate-message
{:user-data {:id 123
:name "John Doe"
:email "john@example.com"
:phone "+1-555-1234"}
:registration-date "2024-01-01"}
:complete-user) ; => false
You can combine composition functions for sophisticated validation logic:
;; Define base schemas
(schema/defschema :basic-product
{:id int? :name string? :price number?})
(schema/defschema :digital-product
{:download-url string? :file-size int?})
(schema/defschema :physical-product
{:weight number? :dimensions string?})
(schema/defschema :subscription-product
{:billing-period string? :auto-renew boolean?})
;; Complex composition: product can be digital OR physical,
;; and optionally be a subscription
(schema/defschema :flexible-product
{:basic-info (schema/schema-ref :basic-product)
:product-type (schema/any-of :digital-product :physical-product)
:subscription-info (schema/any-of
:subscription-product
{:subscription boolean?}) ; or just a flag
:metadata map?})
;; Examples of valid products:
;; Digital subscription product
(schema/validate-message
{:basic-info {:id 1 :name "Software License" :price 99.99}
:product-type {:download-url "https://..." :file-size 1024}
:subscription-info {:billing-period "monthly" :auto-renew true}
:metadata {:category "software"}}
:flexible-product) ; => true
;; Physical one-time product
(schema/validate-message
{:basic-info {:id 2 :name "Book" :price 29.99}
:product-type {:weight 0.5 :dimensions "20x15x2cm"}
:subscription-info {:subscription false}
:metadata {:category "books"}}
:flexible-product) ; => true
Composition functions can be nested for even more complex scenarios:
(schema/defschema :advanced-user
{:profile (schema/any-of
;; Simple user (just basic info)
:user-basic
;; Complete user (basic + contact + permissions)
(schema/all-of :user-basic :user-contact :user-permissions)
;; Admin user (basic + contact + permissions + special fields)
(schema/all-of :user-basic
:user-contact
:user-permissions
{:admin-level int?
:last-admin-action string?}))
:account-type (schema/one-of :basic :premium :admin)
:created-at string?})
When using composed schemas, errors are reported with clear field paths:
(schema/explain-validation
{:user-data {:id 123 ; missing name, email, permissions
:phone "+1-555-1234"}
:registration-date "2024-01-01"}
:complete-user)
; Will show errors like:
; {:field "user-data" :error "None of the schemas in any-of matched"}
; Or for all-of:
; {:field "user-data.name" :error "Field is missing"}
; {:field "user-data.email" :error "Field is missing"}
; {:field "user-data.role" :error "Field is missing"}
(schema/defschema :order-schema
{:order-id string?
:customer {:id int?
:name string?
:email string?
:address {:street string?
:city string?
:zip-code string?
:country string?}}
:total double?
:status (schema/one-of :pending :confirmed :shipped)})
;; Validation
(schema/validate-message
{:order-id "ORD-001"
:customer {:id 123
:name "John Doe"
:email "john@example.com"
:address {:street "123 Main St"
:city "New York"
:zip-code "10001"
:country "USA"}}
:total 99.99
:status :pending}
:order-schema) ; => true
When validation fails in nested structures, you get precise error paths:
(schema/explain-validation
{:order-id "ORD-001"
:customer {:id "invalid" ; error here
:name "John"
:address {:street "123 Main St"
:city "New York"
; zip-code missing ; error here
:country "USA"}}
:total 99.99}
:order-schema)
; Errors will show:
; - "customer.id" - Field validation failed
; - "customer.address.zip-code" - Field is missing
(schema/defschema :cart-schema
{:cart-id string?
:items [{:product-id int? ; Each item must have these fields
:name string?
:price double?
:quantity int?}]
:total double?})
;; Validation
(schema/validate-message
{:cart-id "cart-123"
:items [{:product-id 1 :name "Book" :price 29.99 :quantity 2}
{:product-id 2 :name "Pen" :price 5.99 :quantity 1}]
:total 65.97}
:cart-schema) ; => true
(schema/defschema :user-schema
{:user-id int?
:name string?
:tags [string?] ; Array of strings
:scores [double?] ; Array of doubles
:active boolean?})
;; Validation
(schema/validate-message
{:user-id 123
:name "Alice"
:tags ["premium" "loyal" "verified"]
:scores [95.5 87.2 92.1]
:active true}
:user-schema) ; => true
(schema/explain-validation
{:cart-id "cart-123"
:items [{:product-id 1 :name "Book" :price 29.99 :quantity 2}
{:product-id "invalid" :name "Pen" :quantity 1}]} ; missing price, wrong type
:cart-schema)
; Errors will show:
; - "items[1].product-id" - Field validation failed
; - "items[1].price" - Field is missing
{:valid? false
:errors [{:field "field-name"
:error "Error description"
:value actual-value
:expected predicate-function}]
:message original-message
:schema-id :schema-name}
;; Missing field
{:field "email" :error "Field is missing"}
;; Type validation failed
{:field "user-id" :error "Field validation failed" :value "123" :expected int?}
;; Nested field error
{:field "customer.address.zip-code" :error "Field is missing"}
;; Collection item error
{:field "items[0].price" :error "Field validation failed" :value "invalid"}
(require '[kafka-metamorphosis.core :as km]
'[kafka-metamorphosis.schema :as schema])
;; Define schema
(schema/defschema :user-event-schema
{:event-id string?
:user-id int?
:action string?
:timestamp string?})
;; Send message with validation
(schema/send-schema-message!
"user-events"
{:event-id "evt-123"
:user-id 456
:action "login"
:timestamp "2024-09-01T10:30:00Z"}
:user-event-schema)
;; Consume messages with automatic validation
(schema/consume-schema-messages!
"user-group"
["user-events"]
:user-event-schema)
;; Wrap existing serializers with schema validation
(def validated-serializers
(schema/with-schema-serializers
km/json-serializers
:user-event-schema))
;; Use with producer
(km/create-producer (km/producer-config) validated-serializers)
(schema/defschema :ecommerce-order-schema
{:order-id string?
:customer {:id int?
:name string?
:email email?
:tier (schema/one-of :bronze :silver :gold :platinum)}
:items [{:product-id int?
:sku string?
:name string?
:price double?
:quantity (schema/min-count 1)
:category string?}]
:payment {:method (schema/one-of :credit-card :debit-card :pix :boleto)
:amount double?
:installments int?
:status (schema/one-of :pending :approved :rejected)}
:shipping {:address {:street string?
:city string?
:zip-code string?
:country string?}
:method (schema/one-of :standard :express :same-day)
:cost double?}
:totals {:subtotal double?
:shipping double?
:taxes double?
:discount double?
:total double?}
:timestamps {:created-at string?
:updated-at string?}
:metadata (schema/map-of keyword? string?)})
(schema/defschema :system-event-schema
{:event-id string?
:timestamp string?
:level (schema/one-of :debug :info :warn :error :fatal)
:source string?
:message string?
:context {:request-id string?
:user-id int?
:session-id string?
:ip-address string?}
:metrics {:duration-ms int?
:memory-mb double?
:cpu-percent double?}
:tags (schema/min-count 1)
:metadata (schema/map-of string? any?)})
(schema/defschema :user-profile-schema
{:user-id int?
:username string?
:email email?
:profile {:first-name string?
:last-name string?
:birth-date string?
:phone string?
:avatar-url string?}
:preferences {:language (schema/one-of :en :pt :es :fr)
:timezone string?
:notifications {:email boolean?
:sms boolean?
:push boolean?}
:privacy {:profile-visible boolean?
:email-visible boolean?
:activity-tracking boolean?}}
:addresses [{:type (schema/one-of :home :work :other)
:street string?
:city string?
:state string?
:zip-code string?
:country string?
:default boolean?}]
:social-links (schema/map-of keyword? string?)
:created-at string?
:updated-at string?
:status (schema/one-of :active :inactive :suspended :deleted)})
;; Group related schemas
(defn define-user-schemas []
(schema/defschema :user-profile-schema {...})
(schema/defschema :user-preferences-schema {...})
(schema/defschema :user-activity-schema {...}))
;; Call during application startup
(define-user-schemas)
;; Include version in schema names
(schema/defschema :user-profile-v1-schema {...})
(schema/defschema :user-profile-v2-schema {...})
;; Or use metadata
(schema/defschema :user-profile-schema
^{:version "2.0" :description "User profile with enhanced fields"}
{...})
;; Define common predicates in a separate namespace
(ns myapp.schema.predicates)
(def email?
(fn [s] (and (string? s) (re-matches #".+@.+\..+" s))))
(def positive-int?
(fn [n] (and (int? n) (> n 0))))
(def uuid-string?
(fn [s] (and (string? s) (re-matches #"[0-9a-f-]{36}" s))))
;; Always handle validation errors gracefully
(defn safe-process-message [message schema-id]
(if (schema/validate-message message schema-id)
(process-message message)
(let [explanation (schema/explain-validation message schema-id)]
(log/error "Invalid message" {:errors (:errors explanation)})
{:error "Invalid message format" :details (:errors explanation)})))
;; Test both valid and invalid cases
(deftest test-user-schema
(testing "Valid user message"
(is (schema/validate-message valid-user-message :user-schema)))
(testing "Invalid user message"
(is (false? (schema/validate-message invalid-user-message :user-schema)))
(let [explanation (schema/explain-validation invalid-user-message :user-schema)]
(is (false? (:valid? explanation)))
(is (not (empty? (:errors explanation)))))))
;; Pre-compile complex predicates
(def compiled-email-regex #".+@.+\..+")
(defn fast-email? [s]
(and (string? s) (re-matches compiled-email-regex s)))
;; Cache validation results for expensive operations
(def validation-cache (atom {}))
(defn cached-validate [message schema-id]
(let [cache-key [message schema-id]]
(if-let [cached-result (@validation-cache cache-key)]
cached-result
(let [result (schema/validate-message message schema-id)]
(swap! validation-cache assoc cache-key result)
result))))
(defschema schema-id spec-map)
- Define and register a schema(get-schema schema-id)
- Retrieve a registered schema(list-schemas)
- List all registered schema IDs(validate-message message schema-id)
- Validate a message (returns boolean)(explain-validation message schema-id)
- Get detailed validation explanation(one-of & values)
- Accept one of the specified values(min-count n)
- Minimum collection size(max-count n)
- Maximum collection size(map-of key-pred value-pred)
- Map with specific key-value predicates(schema-ref schema-id)
- Reference another registered schema(any-of & schema-ids)
- Match any one of the provided schemas (OR logic)(all-of & schema-ids)
- Match all of the provided schemas (AND logic)(send-schema-message! topic message schema-id)
- Send validated message(consume-schema-messages! group topics schema-id)
- Consume with validation(with-schema-serializers serializers schema-id)
- Wrap serializers(with-schema-deserialization deserializers schema-id)
- Wrap deserializers🦋 Happy metamorphosis with validated schemas!
Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
Ctrl+k | Jump to recent docs |
← | Move to previous article |
→ | Move to next article |
Ctrl+/ | Jump to the search field |