Kafka Streams low level API, rather imperative.
This namespace is about building a topology of nodes manually. Three types of nodes exist :
A source act as one of the the entry points of a topology. It retrieves records from one or several topics and forwards them to processors or sinks.
A processor process records from sources and other processors and forwards them to sinks or other processors as well. It can persist state by using a state store.
A sink is always a terminal node. Its purpose is to persist records from sources and processors to a concrete topic.
States stores are used in order to persist state in a fault-tolerant manner. By default, they are backed-up to a topic under the hood in order to do so. A regular state store is distributed following partitions. A global one always sources data from all partitions at the same time.
Cf. dvlopt.kstreams.store
for more
Kafka Streams low level API, rather imperative. This namespace is about building a topology of nodes manually. Three types of nodes exist : Sources ------- A source act as one of the the entry points of a topology. It retrieves records from one or several topics and forwards them to processors or sinks. Processors ---------- A processor process records from sources and other processors and forwards them to sinks or other processors as well. It can persist state by using a state store. Sinks ----- A sink is always a terminal node. Its purpose is to persist records from sources and processors to a concrete topic. States stores are used in order to persist state in a fault-tolerant manner. By default, they are backed-up to a topic under the hood in order to do so. A regular state store is distributed following partitions. A global one always sources data from all partitions at the same time. Cf. `dvlopt.kstreams.store` for more
(add-global-store topology
source-name
source-topic
processor-name
processor
options)
Adds a global state store to the topology which, unlike a regular one, is directly connected to a topic and sources records from all its partitions.
A processor is needed in order to update the global state.
A map of options may be given, options for the store as described in dvlopt.kstreams.store
as well as :
:dvlopt.kstreams/extract-timestamp (fn [last-timestamp record]) Function accepting the previous timestamp of the last record and a record, and returning the timestamp chosen for the current record.
Adds a global state store to the topology which, unlike a regular one, is directly connected to a topic and sources records from all its partitions. A processor is needed in order to update the global state. A map of options may be given, options for the store as described in `dvlopt.kstreams.store` as well as : :dvlopt.kstreams/extract-timestamp (fn [last-timestamp record]) Function accepting the previous timestamp of the last record and a record, and returning the timestamp chosen for the current record.
(add-processor topology processor-name parents processor)
Adds a processing node to the topology.
Parents are names of sources and others processors streaming their data into this one.
Just like parents, the processor must be named uniquely in order to be later refered to by others nodes in the topology.
A processor is a map or a function producing a map containing any of those functions :
:dvlopt.kstreams/processor.init (fn [ctx])
Given a context initializes processing.
The returned value is considered as an in-memory state that will be passed to :dvlopt.kstreams/processor.on-record everytime.
Cf. dvlopt.kstreams.ctx
:dvlopt.kstreams/processor.on-record (fn [ctx user-state record])
Given a context and some user state produced by :dvlopt.kstreams/processor.init, processes a record. When needed, arbitrary key-values
can be sent downstream to childs nodes using dvlopt.kstreams.ctx/forward
on the context.
:dvlopt.kstreams/processor.close (fn []) For producing a side-effect when the processor is shutting down, such as releasing resources. Resources such as state stores are already closed by the library.
Ex. (add-processor topology "my-processor" ["parent-1" "parent-2"] {:dvlopt.kstreams/processor.init (fn [ctx] (dvlopt.kstreams.ctx/kv-store ctx "my-store")) :dvlopt.kstreams/processor.on-record (fn [ctx my-store record] ...) :dvlopt.kstreams/processor.on-close (fn [] (println "Bye bye processor"))})
Adds a processing node to the topology. Parents are names of sources and others processors streaming their data into this one. Just like parents, the processor must be named uniquely in order to be later refered to by others nodes in the topology. A processor is a map or a function producing a map containing any of those functions : :dvlopt.kstreams/processor.init (fn [ctx]) Given a context initializes processing. The returned value is considered as an in-memory state that will be passed to :dvlopt.kstreams/processor.on-record everytime. Cf. `dvlopt.kstreams.ctx` :dvlopt.kstreams/processor.on-record (fn [ctx user-state record]) Given a context and some user state produced by :dvlopt.kstreams/processor.init, processes a record. When needed, arbitrary key-values can be sent downstream to childs nodes using `dvlopt.kstreams.ctx/forward` on the context. :dvlopt.kstreams/processor.close (fn []) For producing a side-effect when the processor is shutting down, such as releasing resources. Resources such as state stores are already closed by the library. Ex. (add-processor topology "my-processor" ["parent-1" "parent-2"] {:dvlopt.kstreams/processor.init (fn [ctx] (dvlopt.kstreams.ctx/kv-store ctx "my-store")) :dvlopt.kstreams/processor.on-record (fn [ctx my-store record] ...) :dvlopt.kstreams/processor.on-close (fn [] (println "Bye bye processor"))})
(add-sink topology sink-name parents topic)
(add-sink topology sink-name parents topic options)
Adds a sink topic to the topology.
Parents are names of sources and others processors streaming their data into this one.
Just like parents, the sink must be named uniquely in order to be later refered to by others nodes in the topology.
A map of options may be given :
:dvlopt.kafka/serializer.key
:dvlopt.kafka/serializer.value
Cf. dvlopt.kafka
for description of serializers.
:dvlopt.kstreams/select-partition Function called in order to determine the partition number the record belongs to. If missing, the record will be automatically partitioned by its key.
Adds a sink topic to the topology. Parents are names of sources and others processors streaming their data into this one. Just like parents, the sink must be named uniquely in order to be later refered to by others nodes in the topology. A map of options may be given : :dvlopt.kafka/serializer.key :dvlopt.kafka/serializer.value Cf. `dvlopt.kafka` for description of serializers. :dvlopt.kstreams/select-partition Function called in order to determine the partition number the record belongs to. If missing, the record will be automatically partitioned by its key.
(add-source topology source-name source)
(add-source topology source-name source options)
Adds a source (either a list of topics or regular expression) to the topology.
It must be named uniquely in order to be later used by others nodes in the topology.
A map of options may be given :
:dvlopt.kafka/deserializer.key
:dvlopt.kafka/deserializer.value
Cf. dvlopt.kafka
for description of deserializers.
:dvlopt.kstreams/extract-timestamp Function accepting the previous timestamp of the last record and a record, and returning the timestamp chosen for the current record.
:dvlopt.kstreams/offset-reset When a topic is consumed at first, it should start from the :earliest offset (default) or the :latest.
Adds a source (either a list of topics or regular expression) to the topology. It must be named uniquely in order to be later used by others nodes in the topology. A map of options may be given : :dvlopt.kafka/deserializer.key :dvlopt.kafka/deserializer.value Cf. `dvlopt.kafka` for description of deserializers. :dvlopt.kstreams/extract-timestamp Function accepting the previous timestamp of the last record and a record, and returning the timestamp chosen for the current record. :dvlopt.kstreams/offset-reset When a topic is consumed at first, it should start from the :earliest offset (default) or the :latest.
(add-store topology processor-names)
(add-store topology processor-names options)
Adds a state store to the topology which can later be used by the given processors.
A map of options may be given, all options described in dvlopt.kstreams.store
.
Adds a state store to the topology which can later be used by the given processors. A map of options may be given, all options described in `dvlopt.kstreams.store`.
(describe topology)
Describes the given topology.
Returns a map of internal id -> subgraph, a map containing :
:dvlopt.kstreams/subgraph-type Either :global-store or :subtopology.
Subgraphs of type :global-store contains :
:dvlopt.kafka/topic
Where records are sourced from for populating the global store.
:dvlopt.kstreams/processor.name
Name of the processor node updating the global store.
:dvlopt.kstreams/source.name
Name of the source node fetching records from the topic.
:dvlopt.kstreams.store/name
Name of the global store.
Subgraphs of type :subtopology contains
:dvlopt.kstreams/nodes
Map of node name -> map possibly containing :
:dvlopt.kstreams/children
Set of children nodes.
:dvlopt.kstreams/parents
Set of parent nodes.
Describes the given topology. Returns a map of internal id -> subgraph, a map containing : :dvlopt.kstreams/subgraph-type Either :global-store or :subtopology. Subgraphs of type :global-store contains : :dvlopt.kafka/topic Where records are sourced from for populating the global store. :dvlopt.kstreams/processor.name Name of the processor node updating the global store. :dvlopt.kstreams/source.name Name of the source node fetching records from the topic. :dvlopt.kstreams.store/name Name of the global store. Subgraphs of type :subtopology contains :dvlopt.kstreams/nodes Map of node name -> map possibly containing : :dvlopt.kstreams/children Set of children nodes. :dvlopt.kstreams/parents Set of parent nodes.
(topology)
(topology builder)
Creates a new topology from scratch or from a builder.
Cf. dvlopt.kstreams.build/builder
Creates a new topology from scratch or from a builder. Cf. `dvlopt.kstreams.build/builder`
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close