Kafka Avro Serde for Clojure.
Let's define a GenericAvroSerde
and a Clojure Serde
from this project:
(require '[piotr-yuxuan.slava.config :as config]
'[piotr-yuxuan.slava :as slava])
(import '(io.confluent.kafka.schemaregistry.client CachedSchemaRegistryClient)
'(org.apache.avro Schema SchemaBuilder SchemaBuilder$NamespacedBuilder SchemaBuilder$RecordBuilder SchemaBuilder$FieldAssembler)
'(org.apache.avro.generic GenericData$Record)
'(io.confluent.kafka.streams.serdes.avro GenericAvroSerde))
(def schema-registry-url "mock://")
(def schema-registry-capacity 128)
(def schema-registry (CachedSchemaRegistryClient. schema-registry-url schema-registry-capacity))
(def avro-config {"schema.registry.url" schema-registry-url})
(def ^Schema record-schema
(-> (SchemaBuilder/builder)
^SchemaBuilder$NamespacedBuilder (.record "Record")
^SchemaBuilder$RecordBuilder (.namespace "piotr-yuxuan.slava.test")
^SchemaBuilder$FieldAssembler .fields
(.name "field") .type .intType .noDefault
^GenericData$Record .endRecord))
(def avro-serde
(doto (GenericAvroSerde. schema-registry)
(.configure avro-config (boolean (not :key)))))
(def clojure-serde
(doto (slava/clojure-serde schema-registry)
(.configure (merge config/opinionated avro-config)
(boolean (not :key)))))
We may now use them:
(->> {:field (int 1)}
(.serialize (.serializer clojure-serde) topic)
(.deserialize (.deserializer avro-serde) topic))
;; => (.build (.set (GenericRecordBuilder. schema) "field" (int 1)))
(->> (.build (.set (GenericRecordBuilder. schema) "field" (int 1)))
(.serialize (.serializer avro-serde) topic)
(.deserialize (.deserializer clojure-serde) topic))
;; => {:field (int 1)}
The Clojure Serde
returns an idiomatic Clojure map instead of a
record. See
./test/piotr_yuxuan/slava_test.clj
for further examples.
This library will not reach version 1.0.0 before more extensive tests have been written, and more detailed documentation has been added.
The serialiser and deserialiser by this library provided wrap around
their counterparts from io.confluent.kafka.serializers
. Given a
confluent config and a schema registry instance, a Clojure serde is
built with:
(require '[piotr-yuxuan.slava.config :as config]
'[piotr-yuxuan.slava :as slava])
(import '(io.confluent.kafka.schemaregistry.client CachedSchemaRegistryClient)
'(org.apache.kafka.common.serialization Serde))
(def schema-registry-url "mock://")
(def schema-registry-capacity 128)
(def schema-registry (CachedSchemaRegistryClient. schema-registry-url schema-registry-capacity))
(def avro-config {"schema.registry.url" schema-registry-url})
(def ^Serde clojure-serde
(doto (slava/clojure-serde schema-registry)
(.configure (merge config/default avro-config)
(boolean (not :key)))))
In order to provide enough flexibility to adapt to most needs, you
have complete access to the config. The value config/default
will
add no opinionated behaviours: an Avro enum will stay an instance of
GenericData$EnumSymbol
, an Avro string may be decoded as an instance
of org.apache.avro.util.Utf8
which implements CharSequence and is
different from a java.lang.String
; field keys will be decoded as
strings, not keywords, and so on.
An opinionated config uses different choices to provide a more Clojuresque look-and-feel. For example, let's look at this schema:
(def EnumField
(-> (SchemaBuilder/builder) (.enumeration "enum") (.symbols (into-array String ["A" "B" "C"]))))
(def MyRecord
(-> (SchemaBuilder/builder)
^SchemaBuilder$NamespacedBuilder (.record "RecordSchema")
^SchemaBuilder$FieldAssembler .fields
(.name "intField") .type .intType .noDefault
(.name "booleanField") .type .booleanType .noDefault
(.name "enumField") (.type EnumField) .noDefault
^GenericData$Record .endRecord))
If we define a Serde
with config/opinionated
:
(def ^Serde clojure-serde
(doto (slava/clojure-serde schema-registry)
(.configure (merge config/opinionated avro-config)
(boolean (not :key)))))
then the deserialisation / serialisation logic will be equivalent to:
(defn ^Map my-record-decoder
[^GenericData$Record record]
{:int-field (.get record "intField")
:boolean-field (.get record "booleanField")
:enum-field (csk/->kebab-case-keyword (.toString (.get record "enumField")))})
(defn ^GenericData$Record my-record-encoder
[^Map m]
(.build (doto (GenericRecordBuilder. MyRecord)
(.set "intField" (get m :int-field))
(.set "booleanField" (get m :boolean-field))
(.set "enumField" (GenericData$EnumSymbol. EnumField (csk/->SCREAMING_SNAKE_CASE_STRING (get m :enum-field)))))))
These coders will be compiled only once at the first time, and reused for any subsequent invocation.
See GitHub issues.
For a more complete Clojure API around Kafka, see FundingCircle/jackdaw.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close