Liking cljdoc? Tell your friends :D

dvlopt.kafka

Start here.

This library aims to be clojure idiomatic while not being too smart about wrapping the java libraries, so that upgrading will not be laborious in the future.

It is organized by specific namespaces. Producers (dvlopt.kafka.out) produce records, consumers (dvlopt.kafka.in) consume records, and administrators can alter Kafka (dvlopt.kafka.admin). For anything related to Kafka Streams, see the dvlopt.kstreams namespace.

Records

A record is a map containing at least a ::topic. It might also hold :

::headers List of [Key Value] where the Key is an arbitrary string and Value is an arbitrary byte array which can be missing. Keys can appear more than once.

::offset Offset of the record in the topic-partition.

::key Key of the record, serialized or deserialized. Can be nil.

::timestamp Timestamp of the record.

::timestamp.type A timestamp can refer to when the record was created (:create) or sent (:log-append).

::value Value of the record, serializer of deserializer. Can be nil

Obviously, the offset cannot be decided by the user when sending the record. Headers allow records to have metadata clearly distinct from the value. They exerce the same role as in other protocols such as HTTP.

Connecting to nodes

Opening a resource such as a producer requires a list of nodes (ie. list of [host port]). If not provided, the library tries to reach [["localhost" 9092]]. When described, a node is a map containing ::host, ::port, ::id (numerical identification), and ::rack when one is assigned. A replica node also contains the attribute ::synced?.

Ser/de

Within Kafka, records are kept as transparent byte arrays. When sending records, the key and the value need to be serialized and when consuming some, deserialized.

A serializer is a function mapping some data to a byte array or nil. It is needed for producing records.

Ex. (fn serializer [data metadata] (some-> v nippy/freeze))

A deserializer does the opposite, it maps a byte array (or nil) to a value. It is needed for consuming records.

Ex. (fn deserializer [ba metadata] (some-> ba nippy/thaw))

Both type of functions are often used throughout the library. The provided metadata might help the user to decide on how to ser/de the data. It is a map containing the ::topic involved as well as ::headers when there are some.

Built-in deserializers and serializers are available at deserializers and serializers respectively. Because ser/de is so common, a key word specifying one of the built-in functions can be provided :

:boolean :byte-array :byte-buffer :double :integer :keyword :long :string

If a ser/de is not provided by the user when needed, the default is :byte-array.

Time

All time intervals such as timeouts, throughout all this library, are expressed as [Duration Unit] where Duration is coerced to an integer and Unit is one of :

:nanoseconds :microseconds :milliseconds :seconds :minutes :hours :days

Ex. [5 :seconds]

Start here.

This library aims to be clojure idiomatic while not being too smart about wrapping the java libraries, so that upgrading
will not be laborious in the future.

It is organized by specific namespaces. Producers (dvlopt.kafka.out) produce records, consumers (dvlopt.kafka.in)
consume records, and administrators can alter Kafka (dvlopt.kafka.admin). For anything related to Kafka Streams, see the
`dvlopt.kstreams` namespace.


Records
=======

A record is a map containing at least a ::topic. It might also hold :

  ::headers
    List of [Key Value] where the `Key` is an arbitrary string and `Value` is an arbitrary byte array which can be missing.
    Keys can appear more than once.

  ::offset
    Offset of the record in the topic-partition.

  ::key
    Key of the record, serialized or deserialized. Can be nil.

  ::timestamp
    Timestamp of the record.

  ::timestamp.type
    A timestamp can refer to when the record was created (:create) or sent (:log-append).

  ::value
    Value of the record, serializer of deserializer. Can be nil

Obviously, the offset cannot be decided by the user when sending the record. Headers allow records to have metadata
clearly distinct from the value. They exerce the same role as in other protocols such as HTTP. 


Connecting to nodes
===================

Opening a resource such as a producer requires a list of nodes (ie. list of [host port]). If not provided, the library
tries to reach [["localhost" 9092]]. When described, a node is a map containing ::host, ::port, ::id (numerical
identification), and ::rack when one is assigned. A replica node also contains the attribute ::synced?.


Ser/de
======

Within Kafka, records are kept as transparent byte arrays. When sending records, the key and the value need to be
serialized and when consuming some, deserialized. 

A serializer is a function mapping some data to a byte array or nil. It is needed for producing records.

Ex. (fn serializer [data metadata]
      (some-> v
              nippy/freeze))

A deserializer does the opposite, it maps a byte array (or nil) to a value. It is needed for consuming records.

Ex. (fn deserializer [ba metadata]
      (some-> ba
              nippy/thaw))


Both type of functions are often used throughout the library. The provided metadata might help the user to decide on
how to ser/de the data. It is a map containing the ::topic involved as well as ::headers when there are some.

Built-in deserializers and serializers are available at `deserializers` and  `serializers` respectively. Because ser/de
is so common, a key word specifying one of the built-in functions can be provided :

  :boolean
  :byte-array
  :byte-buffer
  :double
  :integer
  :keyword
  :long
  :string

If a ser/de is not provided by the user when needed, the default is :byte-array.


Time
====

All time intervals such as timeouts, throughout all this library, are expressed as [Duration Unit] where
Duration is coerced to an integer and Unit is one of :

  :nanoseconds
  :microseconds
  :milliseconds
  :seconds
  :minutes
  :hours
  :days

Ex. [5 :seconds]
raw docstring

dvlopt.kafka.-interop.java

Convert clojure data structures to java objects.

Convert clojure data structures to java objects.
raw docstring

dvlopt.kafka.admin

Kafka administration.

Functions accepting a timeout relies on the default request timeout of the admin client when none is provided. Time intervals are described in dvlopt.kafka.

Kafka administration.


Functions accepting a timeout relies on the default request timeout of the admin client when none
is provided. Time intervals are described in `dvlopt.kafka`.
raw docstring

dvlopt.kafka.out

Kafka producers.

This API can also be used by mock producers (cf. dvlopt.kafka.out.mock namespace).

Kafka producers.

This API can also be used by mock producers (cf. dvlopt.kafka.out.mock namespace).
raw docstring

dvlopt.kstreams

Kafka Streams applications.

An app is a topology of nodes sourcing, processing, and sending records. Such a topology can be built in a rather imperative fashion by using the dvlopt.kstreams.topology namespace, the so-called "low-level API". The dvlopt.kstreams.builder namespace is the entry point of the so-called "high-level API". It provides a "builder" which will build a topology for the user while he/she enjoys the rather functional API. Both APIs are actually complementary. Once a builder produces a topology, it can be augmented using the low-level API if needed.

The high-level API is usually prefered.

Kafka Streams applications.


An app is a topology of nodes sourcing, processing, and sending records. Such a topology can be built in a rather imperative fashion
by using the `dvlopt.kstreams.topology` namespace, the so-called "low-level API". The `dvlopt.kstreams.builder` namespace is the
entry point of the so-called "high-level API". It provides a "builder" which will build a topology for the user while he/she enjoys
the rather functional API. Both APIs are actually complementary. Once a builder produces a topology, it can be augmented using the
low-level API if needed.

The high-level API is usually prefered.
raw docstring

dvlopt.kstreams.builder

Kafka Streams high level API, rather functional.

Overview

This API revolves mainly around these abstractions :

Streams (dvlopt.kstreams.stream)

A stream represent a sequence of records which need to be somehow transformed (mapped, filtered, etc). It is distributed by partitions.

Often, some kind of aggregated values need to be computed. Fist, values must be grouped by key to form a grouped stream. It is as if the stream is being divided into substreams, one for every key. Although useful, such a grouped stream does not have a notion of time. Hence, before applying any aggregation, a grouped stream can be windowed if needed. For instance, if keys are user names and values are clicks, we can group the stream by key, window per day, and then aggregate the values by counting the clicks. This would be for computing the number of clicks per user, per day.

Grouped streams, windowed or not, are intermediary representations. Aggregating values always result in a table.

Tables (dvlopt.kstreams.table)

A table associates a unique value to a key. For a given key, a new record represents an update. It can be created right away from a topic or be the result of an aggregation. It is backed-up by a key-value state store. Such a table is distributed by partitions. Hence, in order to be able to query alls keys, if needed, the application instances must be able to query each other. Typically, tables follow delete semantics (ie. a nil value for a key removes this key from the table).

Akin to streams, tables can be re-grouped by another key. For instance, a table of users (keys) might be re-grouped into a table of countries (new keys, each user belongs to a country). Each country now has a list of values (the values of all the user belonging to that country) and those can be aggregated as well. The end result is naturally a new table.

Global tables

A regular table is distributed by partitions. A global one sources its data from all the partitions of a topic at the same time. It is fine and useful as long as the data do not overwhelm a single instance of the Kafka Streams application.

Aggregating

Values can be reduced by key. An aggregation is done much like in clojure itself : a reducing (fn [aggregated key value). Before processing the first record of a key, a function is called for obtaining a seed (ie. first aggregated value). The value is aggregated against the seed, thus bootstrapping the aggregation.

State and repartioning

Just like in the low-level API, state stores are used for stateful operations. Those stores are typically created automatically and need not much more else than serializers and deserializers as described in dvlopt.kafka.

Cf. dvlopt.kstreams.store for more about stores.

Any operation acting on the keys of the records typically result in a repartioning of data at some point, either right away or later. This means the library will persist the new records in an internal topic named '$APPLICATION_ID-$NAME-repartition'. This is needed because the way keys are partioned is very important for a lot of stateful operations such as joins. Operations which might lead to repartioning document this behavior. $NAME is either generated or given by the :dvlotp.kstreams/repartition-name option when documented.

Joins and Co-partitioning

This API offers different kind of joining operations akin to SQL joins. For instance, a stream might be enriched by joining it with a table. Joins are always based on the keys of the records, hence the involved topics need to be co-partitioned. It means they must share the same number of partitions as well as the same partitioning strategy (eg. the default one). By doing so, the library can source records from the same partition numbers, in both joined topics, and be confident that each corresponding partition holds the same keys.

It is the responsability of the user to garantee the same number of partitions otherwise an exception will be thrown. For instance, if needed, a stream can be redirected to an adequate pre-created topic. It is easiest to use the default partitioning strategy. Other than that, a producer might decide the partition number of the records it is sending. In Kafka Streams, the partition number can be decided when writing to a topic by using the :dvlopt.kstreams/select-partition option. It is a function taking the total number of partitions of the topic a record is being sent to, as well as the key and the value of this record.

All of this do not apply to joins with global tables as they sources data from all the available partitions.

Kafka Streams high level API, rather functional.


Overview
========

This API revolves mainly around these abstractions :


  Streams (`dvlopt.kstreams.stream`)
  ----------------------------------------

  A stream represent a sequence of records which need to be somehow transformed (mapped, filtered, etc). It is distributed by partitions.

  Often, some kind of aggregated values need to be computed. Fist, values must be grouped by key to form a grouped stream. It is as if the stream is
  being divided into substreams, one for every key. Although useful, such a grouped stream does not have a notion of time. Hence, before applying any
  aggregation, a grouped stream can be windowed if needed. For instance, if keys are user names and values are clicks, we can group the stream by key,
  window per day, and then aggregate the values by counting the clicks. This would be for computing the number of clicks per user, per day.

  Grouped streams, windowed or not, are intermediary representations. Aggregating values always result in a table.

  Tables (`dvlopt.kstreams.table`)
  --------------------------------------

  A table associates a unique value to a key. For a given key, a new record represents an update. It can be created right away from a topic or be the
  result of an aggregation. It is backed-up by a key-value state store. Such a table is distributed by partitions. Hence, in order to be able to query
  alls keys, if needed, the application instances must be able to query each other. Typically, tables follow delete semantics (ie. a nil value for a
  key removes this key from the table).

  Akin to streams, tables can be re-grouped by another key. For instance, a table of users (keys) might be re-grouped into a table of countries (new keys,
  each user belongs to a country). Each country now has a list of values (the values of all the user belonging to that country) and those can be aggregated
  as well. The end result is naturally a new table.

  Global tables
  -------------

  A regular table is distributed by partitions. A global one sources its data from all the partitions of a topic at the same time. It is fine and useful
  as long as the data do not overwhelm a single instance of the Kafka Streams application.


Aggregating
===========

Values can be reduced by key. An aggregation is done much like in clojure itself : a reducing (fn [aggregated key value). Before processing the first record
of a key, a function is called for obtaining a seed (ie. first aggregated value). The value is aggregated against the seed, thus bootstrapping the
aggregation.


State and repartioning
======================

Just like in the low-level API, state stores are used for stateful operations. Those stores are typically created automatically and need not much more
else than serializers and deserializers as described in `dvlopt.kafka`.

Cf. `dvlopt.kstreams.store` for more about stores.

Any operation acting on the keys of the records typically result in a repartioning of data at some point, either right away or later. This means the library
will persist the new records in an internal topic named '$APPLICATION_ID-$NAME-repartition'. This is needed because the way keys are partioned is very important
for a lot of stateful operations such as joins. Operations which might lead to repartioning document this behavior. $NAME is either generated or given by the
:dvlotp.kstreams/repartition-name option when documented. 


Joins and Co-partitioning
=========================

This API offers different kind of joining operations akin to SQL joins. For instance, a stream might be enriched by joining it with a table. Joins are
always based on the keys of the records, hence the involved topics need to be co-partitioned. It means they must share the same number of partitions as
well as the same partitioning strategy (eg. the default one). By doing so, the library can source records from the same partition numbers, in both joined
topics, and be confident that each corresponding partition holds the same keys. 

It is the responsability of the user to garantee the same number of partitions otherwise an exception will be thrown. For instance, if needed, a stream
can be redirected to an adequate pre-created topic. It is easiest to use the default partitioning strategy. Other than that, a producer might decide the
partition number of the records it is sending. In Kafka Streams, the partition number can be decided when writing to a topic by using the
:dvlopt.kstreams/select-partition option. It is a function taking the total number of partitions of the topic a record is being sent to, as well as the
key and the value of this record.

All of this do not apply to joins with global tables as they sources data from all the available partitions.
raw docstring

dvlopt.kstreams.ctx

Contexts are needed for low-level processors.

Contexts are needed for low-level processors.
raw docstring

dvlopt.kstreams.mock

Building a mock Kafka Streams application which do not need a Kafka cluster.

Building a mock Kafka Streams application which do not need a Kafka cluster.
raw docstring

dvlopt.kstreams.store

Kafka Streams state stores.

Streaming applications often need to store some kind of state. This is the purposes of state stores which are typically backed-up to compacted Kafka topics called changelogs in order to be fault-tolerant. Those changelogs topics are named '$APPLICATION_ID-$GENERATED_NAME-changelog`.

Stores

Three type of stores exist, typically persistent by using RocksDB under the hood :

Key-value stores

Behave like regular tables or, when decided, like LRU ones. A regular key-value store can also be in-memory rather than persistent, which is great for cloud environments where applications are started from scratch everytime. However, an in-memory key-value store will have to catch up with its corresponding changelog everytime the application is restarted.

Persistent window stores

In window stores, each value is associated with a key but also a timestamp. They are used for computing over fixed time intervals.

Persistent session stores

Sessions are non-fixed intervals of activity organized around fixed intervals of inactivity.

For the same key, with an interval of 5 seconds, let us suppose we receive values at time 10, 12 and 20. There is a gap of more than 5 seconds between event 2 and 3. Hence, event 3 would be part of another session. Now let us suppose a 4th event arrives out of order with a timestamp of 16. Now, all 4 events would merge into the same session as no event is more than 5 seconds away from another one.

In the high-level API, stores are often created automatically but can be tweaked to some extent. When a store needs to be created manually, these options might be provided :

:dvlopt.kafka/deserializer.key :dvlopt.kafka/deserializer.value :dvlopt.kafka/serializer.key :dvlopt.kafka/serializer.value Cf. dvlopt.kafka for description of serializers and deserializers.

::cache? Caching tries to minimize the number of updates. For instance, if 2 subsequent values share the same key, only the second one will be persisted. It improves performance and IO but it means records will not be persisted as soon as possible. True by default, should be false only for testing and debugging.

::changelog? In order for state stores to be fault tolerant, they are continuously backed up to a changelog topic behind the scenes. Default is true and this option should not be disabled unless specifically needed.

::configuration.changelog Map of Kafka topic properties for configuring the changelog topic. Cf. https://kafka.apache.org/documentation/#topicconfigs

::name Generated in the form of 'dvlopt.kafka-store-8_DIGIT_NUMBER' when not supplied.

::type Determines the type of the store, one of :

:kv.in-memory
:kv.lru
:kv.regular
:session
:window

LRU key-value stores have this additional option :

::lru-size
  Maximum number of items in the LRU store.

Session and window stores have these additional options :

::retention Time period for which the state store will retain historical data, cannot be smaller than the chosen interval. During any kind of stream processing, it is common that data arrives late or out of order and instead of dropping this data, it is better to update past time windows. However, because disks are not unlimited, one cannot keep the data for all time windows just in case. Hence the need for this option. The higher the retention period and the later can data arrive, but the more is stored. Default is [1 :days].

Window stores have these additional options and mandatory arguments :

::duplicate-keys? Whether or not to retain duplicate keys, akin to caching. Default is false.

::interval (mandatory) Fixed interval of each window. Cf. dvlopt.kafka for description of time intervals

::segments Number of database segments (must be >= 2). Default is 2.

Cursors

Retrieving several values from a store always returns a stateful iterator implementing Closeable and acting as a database cursor. It must be closed after usage otherwise resources will leak. For ease of use, the iterator can be transformed into a sequence by using clojure's iterator-seq. However, the resulting sequence should be consumed eagerly right away.

Each item is a map containing :

:dvlopt.kafka/key Deserialized key.

:dvlopt.kafka/value Deserialized value.

Window stores and sessions stores items also have :

:dvlopt.kafka/timestamp.from Window beginning.

:dvlopt.kafka/timestamp.to Window end.

Ex. ;; Eargerly sum all values.

(with-open [kvs (dvlopt.kstreams.store/kv-range my-store)]
  (reduce (fn [sum kv]
            (+ sum
               (:dvlopt.kafka/value kv)))
          0
          (iterator-seq kvs)))
Kafka Streams state stores.


Streaming applications often need to store some kind of state. This is the purposes of state stores which are typically backed-up
to compacted Kafka topics called changelogs in order to be fault-tolerant. Those changelogs topics are named '$APPLICATION_ID-$GENERATED_NAME-changelog`.

Stores
======

Three type of stores exist, typically persistent by using RocksDB under the hood :


  Key-value stores
  ----------------

  Behave like regular tables or, when decided, like LRU ones. A regular key-value store can also be in-memory rather than persistent,
  which is great for cloud environments where applications are started from scratch everytime. However, an in-memory key-value
  store will have to catch up with its corresponding changelog everytime the application is restarted.

  Persistent window stores
  -----------------------------

  In window stores, each value is associated with a key but also a timestamp. They are used for computing over fixed time intervals.

  Persistent session stores
  -------------------------

  Sessions are non-fixed intervals of activity organized around fixed intervals of inactivity.
 
  For the same key, with an interval of 5 seconds, let us suppose we receive values at time 10, 12 and 20. There is a gap of more than
  5 seconds between event 2 and 3. Hence, event 3 would be part of another session. Now let us suppose a 4th event arrives out of order
  with a timestamp of 16. Now, all 4 events would merge into the same session as no event is more than 5 seconds away from another one.


In the high-level API, stores are often created automatically but can be tweaked to some extent. When a store needs to be created manually,
these options might be provided :

 :dvlopt.kafka/deserializer.key
 :dvlopt.kafka/deserializer.value
 :dvlopt.kafka/serializer.key
 :dvlopt.kafka/serializer.value
  Cf. `dvlopt.kafka` for description of serializers and deserializers.

 ::cache?
  Caching tries to minimize the number of updates. For instance, if 2 subsequent values share the same key, only the second one
  will be persisted. It improves performance and IO but it means records will not be persisted as soon as possible.
  True by default, should be false only for testing and debugging.

 ::changelog?
  In order for state stores to be fault tolerant, they are continuously backed up to a changelog topic behind the scenes.
  Default is true and this option should not be disabled unless specifically needed.

 ::configuration.changelog
  Map of Kafka topic properties for configuring the changelog topic.
  Cf. https://kafka.apache.org/documentation/#topicconfigs

 ::name
  Generated in the form of 'dvlopt.kafka-store-8_DIGIT_NUMBER' when not supplied.

 ::type
  Determines the type of the store, one of :

    :kv.in-memory
    :kv.lru
    :kv.regular
    :session
    :window

 LRU key-value stores have this additional option :

    ::lru-size
      Maximum number of items in the LRU store.

 Session and window stores have these additional options :

   ::retention 
    Time period for which the state store will retain historical data, cannot be smaller than the chosen interval.
    During any kind of stream processing, it is common that data arrives late or out of order and instead of dropping this data,
    it is better to update past time windows. However, because disks are not unlimited, one cannot keep the data for all time windows
    just in case. Hence the need for this option. The higher the retention period and the later can data arrive, but the more is stored.
    Default is [1 :days].

 Window stores have these additional options and mandatory arguments :

   ::duplicate-keys?
    Whether or not to retain duplicate keys, akin to caching.
    Default is false.

   ::interval (mandatory)
    Fixed interval of each window.
    Cf. `dvlopt.kafka` for description of time intervals

   ::segments
    Number of database segments (must be >= 2).
    Default is 2.


Cursors
=======

Retrieving several values from a store always returns a stateful iterator implementing Closeable and acting as a database cursor. It must be
closed after usage otherwise resources will leak. For ease of use, the iterator can be transformed into a sequence by using clojure's `iterator-seq`.
However, the resulting sequence should be consumed eagerly right away.

Each item is a map containing :

  :dvlopt.kafka/key
   Deserialized key.

  :dvlopt.kafka/value
   Deserialized value.

Window stores and sessions stores items also have :

  :dvlopt.kafka/timestamp.from
   Window beginning.

  :dvlopt.kafka/timestamp.to
   Window end.


Ex. ;; Eargerly sum all values.
 
    (with-open [kvs (dvlopt.kstreams.store/kv-range my-store)]
      (reduce (fn [sum kv]
                (+ sum
                   (:dvlopt.kafka/value kv)))
              0
              (iterator-seq kvs)))
raw docstring

dvlopt.kstreams.stream

Kafka Streams' abstraction of streams.

Cf. dvlopt.kstreams.builder for the big picture and details

A stream can be transformed by various functions. Those functions returns themselves a new stream representing the transformation. Hence, a single stream can be used for the base of more than one transformation.

The values of a stream can be aggregated. Prior to this, it needs to be grouped by using the group-by or group-by-key function from this namespace. If needed, a grouped stream can then be windowed by fixed time intervals or by sessions. Then, an appropriate reduce-* function can be used to perform the aggregation which always result in a table.

A whole variety of joins between a stream and anything else are available.

Kafka Streams' abstraction of streams.

Cf. `dvlopt.kstreams.builder` for the big picture and details


A stream can be transformed by various functions. Those functions returns themselves a new stream representing the transformation.
Hence, a single stream can be used for the base of more than one transformation.

The values of a stream can be aggregated. Prior to this, it needs to be grouped by using the `group-by` or `group-by-key` function from
this namespace. If needed, a grouped stream can then be windowed by fixed time intervals or by sessions. Then, an appropriate `reduce-*`
function can be used to perform the aggregation which always result in a table.

A whole variety of joins between a stream and anything else are available.
raw docstring

dvlopt.kstreams.table

Kafka Streams' abstraction of tables.

Cf. dvlopt.kstreams.builder for the big picture and details

A table can be transformed by various functions. Those functions always return a new table representing the transformation. Hence, a single table can be used for more than one transformation.

A table can be re-grouped by other keys using the map-and-group-by function and then the values aggregated for each key, resulting in a new table.

A table can also be joined with another table.

A table is backed-up by a state store. As such, these options, called the standard options in this namespace, can very often be supplied :

:dvlopt.kafka/deserializer.key :dvlopt.kafka/deserializer.value :dvlopt.kafka/serializer.key :dvlopt.kafka/serializer.value Cf. dvlopt.kafka for description of serializers and deserializers

:dvlopt.kstreams.store/cache? :dvlopt.kstreams.store/changelog? :dvlopt.kstreams.store/configuration.changelog :dvlopt.kstreams.store/name :dvlopt.kstreams.store/type Exactly as described in dvlopt.kstreams.store but the type is restricted to #{:kv.in-memory :kv.regular}.

Kafka Streams' abstraction of tables.

Cf. `dvlopt.kstreams.builder` for the big picture and details


A table can be transformed by various functions. Those functions always return a new table representing the transformation.
Hence, a single table can be used for more than one transformation.

A table can be re-grouped by other keys using the `map-and-group-by` function and then the values aggregated for each key,
resulting in a new table.

A table can also be joined with another table.


A table is backed-up by a state store. As such, these options, called the standard options in this namespace, can very often
be supplied :

  :dvlopt.kafka/deserializer.key
  :dvlopt.kafka/deserializer.value
  :dvlopt.kafka/serializer.key
  :dvlopt.kafka/serializer.value
   Cf. `dvlopt.kafka` for description of serializers and deserializers

  :dvlopt.kstreams.store/cache?
  :dvlopt.kstreams.store/changelog?
  :dvlopt.kstreams.store/configuration.changelog
  :dvlopt.kstreams.store/name
  :dvlopt.kstreams.store/type
   Exactly as described in `dvlopt.kstreams.store` but the type is restricted to #{:kv.in-memory :kv.regular}.
raw docstring

dvlopt.kstreams.topology

Kafka Streams low level API, rather imperative.

This namespace is about building a topology of nodes manually. Three types of nodes exist :

Sources

A source act as one of the the entry points of a topology. It retrieves records from one or several topics and forwards them to processors or sinks.

Processors

A processor process records from sources and other processors and forwards them to sinks or other processors as well. It can persist state by using a state store.

Sinks

A sink is always a terminal node. Its purpose is to persist records from sources and processors to a concrete topic.

States stores are used in order to persist state in a fault-tolerant manner. By default, they are backed-up to a topic under the hood in order to do so. A regular state store is distributed following partitions. A global one always sources data from all partitions at the same time.

Cf. dvlopt.kstreams.store for more

Kafka Streams low level API, rather imperative.


This namespace is about building a topology of nodes manually. Three types of nodes exist :

  Sources
  -------

  A source act as one of the the entry points of a topology. It retrieves records from one or several topics and forwards
  them to processors or sinks.

  Processors
  ----------

  A processor process records from sources and other processors and forwards them to sinks or other processors as well.
  It can persist state by using a state store.

  Sinks
  -----

  A sink is always a terminal node. Its purpose is to persist records from sources and processors to a concrete topic.


States stores are used in order to persist state in a fault-tolerant manner. By default, they are backed-up to a topic under
the hood in order to do so. A regular state store is distributed following partitions. A global one always sources data from
all partitions at the same time.

Cf. `dvlopt.kstreams.store` for more
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close