Tools for administering or just interacting with a Kafka cluster.
Wraps the AdminClient
API, replacing the Scala admin APIs.
Like the underlying AdminClient
API, this namespace is subject to
change and should be considered of alpha stability.
Tools for administering or just interacting with a Kafka cluster. Wraps the `AdminClient` API, replacing the Scala admin APIs. Like the underlying `AdminClient` API, this namespace is subject to change and should be considered of alpha stability.
Clojure wrapper to Kafka's consumer and producer APIs.
The consumers and producers are the basis for streams, and many other use cases. They can be used to send messages directly to, or read messages from topics. There are also some facilities for polling, and transactions.
See jackdaw.client.*
for some add-ons atop this API.
Clojure wrapper to Kafka's consumer and producer APIs. The consumers and producers are the basis for streams, and many other use cases. They can be used to send messages directly to, or read messages from topics. There are also some facilities for polling, and transactions. See `jackdaw.client.*` for some add-ons atop this API.
Extras for jackdaw.client
for treating topics as seqs of records.
Pretty handy for testing.
Extras for `jackdaw.client` for treating topics as seqs of records. Pretty handy for testing.
Extras for jackdaw.client
which help you define record partitioning
schemes as part of a topic's configuration.
The partitioning API provided by Kafka, like the serdes API, leaves
a lot to be desired when trying to interop from Clojure. You have to
define a org.apache.kafka.clients.producer.Partitioner
class
implementation with an 0-arity constructor, and you include the name
of that Partitioner class in your producer options. This seems to
have been done so that the Partitioner can have access to Kafka
internal state about the cluster, from which to read partition count
and related data. But this pretty soundly defeats Clojure's idioms
of avoiding class generation wherever possible and using instance
parameterization.
The Producer
(and Consumer
) APIs however do expose
.partitionsFor
- a way to interrogate a topic to understand how
many partitions it contains.
This namespace defines a mechanism by which clients can define "defaulting" behavior both for record keys, and for record partitioning.
Lets say I want to specify that my topic is always partitioned by some field of the records on the topic. It would be convenient to let a (thin) framework handle that.
Likewise it would be convenient to easily define as normal Clojure functions the computation by which I wish to assign records to partitions, rather than having to code up a custom class.
This namespace provides both capabilities via an extended
#'->ProducerRecord
, and provides a #'produce!
identical to that
in jackdaw.client
but backed by the partitioning machinery.
Extras for `jackdaw.client` which help you define record partitioning schemes as part of a topic's configuration. The partitioning API provided by Kafka, like the serdes API, leaves a lot to be desired when trying to interop from Clojure. You have to define a `org.apache.kafka.clients.producer.Partitioner` class implementation with an 0-arity constructor, and you include the name of that Partitioner class in your producer options. This seems to have been done so that the Partitioner can have access to Kafka internal state about the cluster, from which to read partition count and related data. But this pretty soundly defeats Clojure's idioms of avoiding class generation wherever possible and using instance parameterization. The `Producer` (and `Consumer`) APIs however do expose `.partitionsFor` - a way to interrogate a topic to understand how many partitions it contains. This namespace defines a mechanism by which clients can define "defaulting" behavior both for record keys, and for record partitioning. Lets say I want to specify that my topic is always partitioned by some field of the records on the topic. It would be convenient to let a (thin) framework handle that. Likewise it would be convenient to easily define as normal Clojure functions the computation by which I wish to assign records to partitions, rather than having to code up a custom class. This namespace provides both capabilities via an extended `#'->ProducerRecord`, and provides a `#'produce!` identical to that in `jackdaw.client` but backed by the partitioning machinery.
This namespace aims to provide ->T
, (datafy T)
, and data->T
as
a round-tripping of Katka's (client) record types.
Note that for some types, particularly Kafka's -Result
types no
->T
constructors are provided as there are no consumers within the
Kafka API for these records they are merely packed results.
For compatibility with Clojure before 1.10.0, a datafy
function is
provided. On 1.10 or after, it simply defers to
clojure.datafy/datafy
but before 1.10 it acts as a backport
thereof.
This namespace aims to provide `->T`, `(datafy T)`, and `data->T` as a round-tripping of Katka's (client) record types. Note that for some types, particularly Kafka's `-Result` types no `->T` constructors are provided as there are no consumers within the Kafka API for these records they are merely packed results. For compatibility with Clojure before 1.10.0, a `datafy` function is provided. On 1.10 or after, it simply defers to `clojure.datafy/datafy` but before 1.10 it acts as a backport thereof.
Implements string and EDN serdes (serializer/deserializer).
This is the public API for jackdaw.serdes.
Implements string and EDN serdes (serializer/deserializer). This is the public API for jackdaw.serdes.
DEPRECATION NOTICE:
This namespace is deprecated and will soon be removed. Please use jackdaw.serdes.avro.confluent.
Generating Serdes mapping Clojure <-> Avro.
The intentional API of this NS has three main features -
SchemaCoercion
, the intentional type registry (of which
#'+base-schema-type-registry+
is an example) and
#'serde
.
serde
is the primary entry point to this namespace for users.
It's a function of a schema-registry configuration, a schema
type registry, and a serde configuration to be instantiated.
The intent is that an end user will partial
the serde
function with their schema registry details and desired type
registry, and use the partial
'd function as en entry in a registry
as used by jackdaw.serdes/serde
.
This allows serdes
and serde
to be agnostic to application or
environment specific configuration details.
But what's this type registry?
Apache Avro "logical types" - a tool for annotating fields in an
avro record as having some complex interpretation beyond their
serialized format. The type-registry
for the purposes of the
serde
function a mapping of addresses to functions which will
when invoked build and return a SchemaCoercion
instance.
When a Serde is instantiated, a stack of SchemaCoercion
coersion
helpers is built which will - given a simply deserialized Avro
record - walk its tree coercing its to Clojure types as defined by
the SchemaCoercion
helpers.
The SchemaCoercion
stack is built by statically inspecting the parsed
Avro schema, and using the type (if any) and potentially logical
type to select a handler in the type-registry
which will, given a
function with which to recurse and the schema of that node, build
and return a SchemaCoercion
handler.
This registry pattern is deliberately chosen so that Avro coercion
will be customizable by the user. As an example, the
+UUID-type-registry+
is included, which defines a mapping from two
different logical UUID refinements of the binary string type to an
appropriate handler.
A user who wanted to opt into these handlers could simply call
serde
with
(merge +base-schema-type-registry+ +UUID-type-registry+)
Users are HIGHLY encouraged to use the +base-schema-type-registry+
as the base for their type registries, as it defines sane handlings
for all of Avro's fundamental types and most of its compounds.
DEPRECATION NOTICE: This namespace is deprecated and will soon be removed. Please use jackdaw.serdes.avro.confluent. Generating Serdes mapping Clojure <-> Avro. The intentional API of this NS has three main features - `SchemaCoercion`, the intentional type registry (of which `#'+base-schema-type-registry+` is an example) and `#'serde`. `serde` is the primary entry point to this namespace for users. It's a function of a schema-registry configuration, a schema type registry, and a serde configuration to be instantiated. The intent is that an end user will `partial` the `serde` function with their schema registry details and desired type registry, and use the `partial`'d function as en entry in a registry as used by `jackdaw.serdes/serde`. This allows `serdes` and `serde` to be agnostic to application or environment specific configuration details. But what's this type registry? Apache Avro "logical types" - a tool for annotating fields in an avro record as having some complex interpretation beyond their serialized format. The `type-registry` for the purposes of the `serde` function a mapping of addresses to functions which will when invoked build and return a `SchemaCoercion` instance. When a Serde is instantiated, a stack of `SchemaCoercion` coersion helpers is built which will - given a simply deserialized Avro record - walk its tree coercing its to Clojure types as defined by the `SchemaCoercion` helpers. The `SchemaCoercion` stack is built by statically inspecting the parsed Avro schema, and using the type (if any) and potentially logical type to select a handler in the `type-registry` which will, given a function with which to recurse and the schema of that node, build and return a `SchemaCoercion` handler. This registry pattern is deliberately chosen so that Avro coercion will be customizable by the user. As an example, the `+UUID-type-registry+` is included, which defines a mapping from two different logical UUID refinements of the binary string type to an appropriate handler. A user who wanted to opt into these handlers could simply call `serde` with `(merge +base-schema-type-registry+ +UUID-type-registry+)` Users are HIGHLY encouraged to use the `+base-schema-type-registry+` as the base for their type registries, as it defines sane handlings for all of Avro's fundamental types and most of its compounds.
Helpers for talking to one of Confluent's Avro schema registries.
Helpers for talking to one of Confluent's Avro schema registries.
DEPRECATION NOTICE:
This namespace is deprecated. Please use jackdaw.serdes/edn-serde.
The behavior of the new EDN serde is different. It does not print the newline.
Implements an EDN SerDes (Serializer/Deserializer).
DEPRECATION NOTICE: This namespace is deprecated. Please use jackdaw.serdes/edn-serde. The behavior of the new EDN serde is different. It does not print the newline. Implements an EDN SerDes (Serializer/Deserializer).
Implements an EDN SerDes (Serializer/Deserializer).
Implements an EDN SerDes (Serializer/Deserializer).
Implements a Fressian SerDes (Serializer/Deserializer).
Implements a Fressian SerDes (Serializer/Deserializer).
Implements a JSON SerDes (Serializer/Deserializer).
Implements a JSON SerDes (Serializer/Deserializer).
Implements a Confluent JSON SCHEMA REGISTRY SerDes (Serializer/Deserializer).
Implements a Confluent JSON SCHEMA REGISTRY SerDes (Serializer/Deserializer).
Helper function for creating serdes.
Helper function for creating serdes.
Kafka streams protocols.
Kafka streams protocols.
Protocol for a configurable thing.
Protocol for a configurable thing.
Clojure wrapper to kafka streams.
Clojure wrapper to kafka streams.
FIXME
FIXME
Clojure wrapper to kafka streams.
Clojure wrapper to kafka streams.
Wrappers for the Java 'lambda' functions.
Wrappers for the Java 'lambda' functions.
Mocks for testing kafka streams.
Mocks for testing kafka streams.
Kafka streams protocols.
Kafka streams protocols.
A test-machine executes sequences of test commands
Test machines can be constructed to operate against a variety of targets. For example:
In each of these cases, as a test-author we typically want to do the same type of thing. Inject a bunch of events into the system, wait until the system under test has finished processing, see what comes out the other end, and check that looks good.
But the mechanism by which data is injected and observed is different in each case. The test-machine exists so that the author doesn't care. The exact same test (or scenario) can be executed against a mock topology processor, a kafka cluster running on localhost, or (via smokin or the rest-proxy) a remote kafka cluster shared with other users.
A test-machine executes sequences of test commands Test machines can be constructed to operate against a variety of targets. For example: - a local development kafka cluster - a mock topology processor - a cluster shared with other users In each of these cases, as a test-author we typically want to do the same type of thing. Inject a bunch of events into the system, wait until the system under test has finished processing, see what comes out the other end, and check that looks good. But the mechanism by which data is injected and observed is different in each case. The test-machine exists so that the author doesn't care. The exact same test (or scenario) can be executed against a mock topology processor, a kafka cluster running on localhost, or (via smokin or the rest-proxy) a remote kafka cluster shared with other users.
A test machine executor can be built by composing execution wrappers defined in here
A test machine executor can be built by composing execution wrappers defined in here
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close