Kafka Streams' abstraction of streams.
Cf. dvlopt.kstreams.builder
for the big picture and details
A stream can be transformed by various functions. Those functions returns themselves a new stream representing the transformation. Hence, a single stream can be used for the base of more than one transformation.
The values of a stream can be aggregated. Prior to this, it needs to be grouped by using the group-by
or group-by-key
function from
this namespace. If needed, a grouped stream can then be windowed by fixed time intervals or by sessions. Then, an appropriate reduce-*
function can be used to perform the aggregation which always result in a table.
A whole variety of joins between a stream and anything else are available.
Kafka Streams' abstraction of streams. Cf. `dvlopt.kstreams.builder` for the big picture and details A stream can be transformed by various functions. Those functions returns themselves a new stream representing the transformation. Hence, a single stream can be used for the base of more than one transformation. The values of a stream can be aggregated. Prior to this, it needs to be grouped by using the `group-by` or `group-by-key` function from this namespace. If needed, a grouped stream can then be windowed by fixed time intervals or by sessions. Then, an appropriate `reduce-*` function can be used to perform the aggregation which always result in a table. A whole variety of joins between a stream and anything else are available.
(branch stream predicates)
Given a list of predicate functions, returns a vector of new corresponding streams such that as soon as a key-value from the original stream returns true on a predicate, it is sent to the corresponding stream (and only that one).
A record is dropped if it matches no predicate.
Ex. ;; Divives a stream into 3 new streams containing respectively :red values, :black ones, and any other ones.
(branch stream
[(fn only-red [k v]
(= v
:red))
(fn only-black [k v]
(= v
:black))
(fn other [k v]
true)])
Given a list of predicate functions, returns a vector of new corresponding streams such that as soon as a key-value from the original stream returns true on a predicate, it is sent to the corresponding stream (and only that one). A record is dropped if it matches no predicate. Ex. ;; Divives a stream into 3 new streams containing respectively :red values, :black ones, and any other ones. (branch stream [(fn only-red [k v] (= v :red)) (fn only-black [k v] (= v :black)) (fn other [k v] true)])
(do-kv stream f)
For each key-value, do a side-effect and then continue streaming.
Side effects cannot be tracked by Kafka, hence they do not benefit from Kafka's processing garantees.
Ex. (do-kv stream (fn [k v] (println :key k :value v)))
For each key-value, do a side-effect and then continue streaming. Side effects cannot be tracked by Kafka, hence they do not benefit from Kafka's processing garantees. Ex. (do-kv stream (fn [k v] (println :key k :value v)))
(filter-kv stream predicate)
Returns a new stream filtering out key-values from the given one.
Ex. ;; Keeps values greater than or equal to 42.
(filter-kv stream
(fn [k v]
(>= v
42)))
Returns a new stream filtering out key-values from the given one. Ex. ;; Keeps values greater than or equal to 42. (filter-kv stream (fn [k v] (>= v 42)))
(fmap-kv kstream f)
Returns a new stream mapping key-values from the given one into a collection of [key value]'s (or nil).
Marks the resulting stream for repartioning.
Cf. dvlopt.kstreams.builder
Ex. ;; The value is a list of tokens and we want to count them individually so that we end up with a collection where the key is a unique token and the value is its count.
(fmap-kv stream
(fn [k v]
(reduce (fn [hmap token]
(assoc hmap
token
(inc (get hmap
token
0))))
{}
v)))
Returns a new stream mapping key-values from the given one into a collection of [key value]'s (or nil). Marks the resulting stream for repartioning. Cf. `dvlopt.kstreams.builder` Ex. ;; The value is a list of tokens and we want to count them individually so that we end up with a collection where the key is a unique token and the value is its count. (fmap-kv stream (fn [k v] (reduce (fn [hmap token] (assoc hmap token (inc (get hmap token 0)))) {} v)))
(fmap-values kstream f)
Returns a new stream mapping values from the given one into a collection of values (or nil).
Unlike fmap-kv
, does not mark the resulting stream for repartioning as the keys remain intact.
Ex. ;; The value is a list we want to split into individual items while filtering out nils.
(fmap-values stream
(fn [k v]
(filter some?
v)))
Returns a new stream mapping values from the given one into a collection of values (or nil). Unlike `fmap-kv`, does not mark the resulting stream for repartioning as the keys remain intact. Ex. ;; The value is a list we want to split into individual items while filtering out nils. (fmap-values stream (fn [k v] (filter some? v)))
(group-by stream f)
(group-by kstream f options)
Returns a new grouped stream grouping values based on a chosen key.
Drops records leading to a nil chosen key.
Because a new key is explicitly selected, the data is repartioned.
Cf. dvlopt.kstreams.builder
about repartioning.
A map of options may be given :
:dvlopt.kafka/deserializer.key
:dvlopt.kafka/deserializer.value
:dvlopt.kafka/serializer.key
:dvlopt.kafka/serializer.value
Cf. dvlopt.kafka
for description of deserializers
:dvlopt.kstreams/repartition-name
Cf. dvlopt.kstreams.builder
section "State and repartitioning"
Ex. (group-by stream (fn by-country [k v] (:country v)))
Returns a new grouped stream grouping values based on a chosen key. Drops records leading to a nil chosen key. Because a new key is explicitly selected, the data is repartioned. Cf. `dvlopt.kstreams.builder` about repartioning. A map of options may be given : :dvlopt.kafka/deserializer.key :dvlopt.kafka/deserializer.value :dvlopt.kafka/serializer.key :dvlopt.kafka/serializer.value Cf. `dvlopt.kafka` for description of deserializers :dvlopt.kstreams/repartition-name Cf. `dvlopt.kstreams.builder` section "State and repartitioning" Ex. (group-by stream (fn by-country [k v] (:country v)))
(group-by-key stream)
(group-by-key stream options)
Exactly like group-by
but groups directly by the original key than a selected one.
Unless the stream was marked for repartioning, this function will not repartition it as keys remain intact.
Exactly like `group-by` but groups directly by the original key than a selected one. Unless the stream was marked for repartioning, this function will not repartition it as keys remain intact.
(join-with-global-table left-stream right-global-table f-map-k f-join)
Similar to join-with-table
but offers the following :
Each record of the input stream is mapped to a new key which triggers a join if the global table contains that mapped key. The value resulting from the join is associated with the key of the original record.
Records from the input stream with a nil key or value are ignored. Records from the global table with a nil value removes the corresponding key from this global table.
The input stream is repartitioned if it was marked for repartitioning.
Ex. ;; Enrich a stream of users by adding the most popular song of the country they are from.
(join-with-global-table stream
global-table
(fn map-k [k v]
(:country v))
(fn join [v-stream v-global-table]
(assoc v-stream
:song
(first (:top-10 v-global-table)))))
Similar to `join-with-table` but offers the following : - Data need not to be co-partitioned. - More efficient when a regular table is skewed (some partitions are heavily populated, some not). - Typically more efficient that performing several joins in order to reach the same result. - Allows for joining on a different key. Each record of the input stream is mapped to a new key which triggers a join if the global table contains that mapped key. The value resulting from the join is associated with the key of the original record. Records from the input stream with a nil key or value are ignored. Records from the global table with a nil value removes the corresponding key from this global table. The input stream is repartitioned if it was marked for repartitioning. Ex. ;; Enrich a stream of users by adding the most popular song of the country they are from. (join-with-global-table stream global-table (fn map-k [k v] (:country v)) (fn join [v-stream v-global-table] (assoc v-stream :song (first (:top-10 v-global-table)))))
(join-with-stream left-stream right-stream f interval)
(join-with-stream left-stream right-stream f interval options)
Returns a new stream inner joining values from both given streams every time :
a) They shares the same key.
b) The difference in the timestamps of both records is <= the given interval.
Cf. dvlopt.kafka
for descriptions of time intervals.
Hence, for the same key, several joins might happen within a single interval.
Records with a nil key or nil value are ignored.
Input streams are repartitioned if they were marked for repartitioning and a store will be generated for each of them.
Cf. dvlopt.kstreams.builder
for requirements related to joins
A map of options may be given :
:dvlopt.kafka/deserializer.key :dvlopt.kafka/serializer.key
:dvlopt.kstreams/left :dvlopt.kstreams/right Maps which may contain value ser/de :
:dvlopt.kafka/deserializer.value
:dvlopt.kafka/serializer.value
Cf. dvlopt.kafka
for description of serializers and deserializers.
:dvlopt.kstreams.store/retention
Cf. dvlopt.kstreams.store
Ex. (join-with-stream left-stream right-stream (fn [v-left v-right] (str "left = " v-left "and right = " v-right)) [2 :seconds])
Returns a new stream inner joining values from both given streams every time : a) They shares the same key. b) The difference in the timestamps of both records is <= the given interval. Cf. `dvlopt.kafka` for descriptions of time intervals. Hence, for the same key, several joins might happen within a single interval. Records with a nil key or nil value are ignored. Input streams are repartitioned if they were marked for repartitioning and a store will be generated for each of them. Cf. `dvlopt.kstreams.builder` for requirements related to joins A map of options may be given : :dvlopt.kafka/deserializer.key :dvlopt.kafka/serializer.key :dvlopt.kstreams/left :dvlopt.kstreams/right Maps which may contain value ser/de : :dvlopt.kafka/deserializer.value :dvlopt.kafka/serializer.value Cf. `dvlopt.kafka` for description of serializers and deserializers. :dvlopt.kstreams.store/retention Cf. `dvlopt.kstreams.store` Ex. (join-with-stream left-stream right-stream (fn [v-left v-right] (str "left = " v-left "and right = " v-right)) [2 :seconds])
(join-with-table left-stream right-table f)
(join-with-table left-stream right-table f options)
Returns a new stream inner joining the given stream with the given table. The join is triggered everytime a new record arrives in the stream and the table contains the key of this record. This is very useful for enriching a stream with some up-to-date information.
Records from the input stream with a nil key or value are ignored. Records from the input table with a nil value removes the corresponding key from this table.
The input stream is repartioned if it was marked for repartioning.
A map of options may be given, just like in join-with-stream
.
Ex. (join-with-table stream table (fn [v-stream v-table] (assoc v-stream :last-known-location (:location v-table))))
Returns a new stream inner joining the given stream with the given table. The join is triggered everytime a new record arrives in the stream and the table contains the key of this record. This is very useful for enriching a stream with some up-to-date information. Records from the input stream with a nil key or value are ignored. Records from the input table with a nil value removes the corresponding key from this table. The input stream is repartioned if it was marked for repartioning. A map of options may be given, just like in `join-with-stream`. Ex. (join-with-table stream table (fn [v-stream v-table] (assoc v-stream :last-known-location (:location v-table))))
(left-join-with-global-table left-stream right-global-table f-map-k f-join)
Exactly like join-with-global-table
but the join is triggered even if the global table does not contain the mapped
key. In such case, the right value supplied during the join is nil.
Exactly like `join-with-global-table` but the join is triggered even if the global table does not contain the mapped key. In such case, the right value supplied during the join is nil.
(left-join-with-stream left-stream right-stream f interval)
(left-join-with-stream left-stream right-stream f interval options)
Exactly like join-with-stream
but the join is triggered even if there is no record with the same key yet in
the right stream for a given time window. In such case, the right value provided for the join is nil.
Exactly like `join-with-stream` but the join is triggered even if there is no record with the same key yet in the right stream for a given time window. In such case, the right value provided for the join is nil.
(left-join-with-table left-stream right-table f)
(left-join-with-table left-stream right-table f options)
Exactly like join-with-table
but the join is triggered even if the table does contain the key. In such case, the
right value supplied during the join in nil.
Exactly like `join-with-table` but the join is triggered even if the table does contain the key. In such case, the right value supplied during the join in nil.
(map-keys stream f)
Returns a new stream efficiently mapping keys from the given one.
Marks the resulting stream for repartioning.
Cf. dvlopt.kstreams.builder
Ex. ;; The key is an ip address mapped to a country.
(map-keys stream
(fn [k v]
(country-from-ip k)))
Returns a new stream efficiently mapping keys from the given one. Marks the resulting stream for repartioning. Cf. `dvlopt.kstreams.builder` Ex. ;; The key is an ip address mapped to a country. (map-keys stream (fn [k v] (country-from-ip k)))
(map-kv stream f)
Returns a new stream mapping key-values from the given one.
Marks the resulting stream for repartioning.
Cf. dvlopt.kstreams.builder
Ex. ;; The key is an ip address mapped to a country and the value is a collection mapped to its number of items.
(map-kv stream
(fn [k v]
[(country-from-ip k)
(count v)]))
Returns a new stream mapping key-values from the given one. Marks the resulting stream for repartioning. Cf. `dvlopt.kstreams.builder` Ex. ;; The key is an ip address mapped to a country and the value is a collection mapped to its number of items. (map-kv stream (fn [k v] [(country-from-ip k) (count v)]))
(map-values kstream f)
Returns a new stream efficiently mapping values from the given one.
Unlike other mapping functions, does not mark the resulting stream for repartioning as the keys remain intact.
Ex. ;; The value is a collection mapped to its number of items.
(map-values stream
(fn [k v]
(count v)))
Returns a new stream efficiently mapping values from the given one. Unlike other mapping functions, does not mark the resulting stream for repartioning as the keys remain intact. Ex. ;; The value is a collection mapped to its number of items. (map-values stream (fn [k v] (count v)))
(merge-stream stream other-stream)
Returns a new stream merging the two given ones.
There is no guarantee about the ordering of the records between streams. However, order is preserved for each stream.
Returns a new stream merging the two given ones. There is no guarantee about the ordering of the records between streams. However, order is preserved for each stream.
(outer-join-with-stream left-stream right-stream f interval)
(outer-join-with-stream left-stream right-stream f interval options)
Exactly like join-with-stream
but the join is triggered even if there is no record with the same key yet in
the other stream for a given time window. In such case, the value provided for this side of the join is nil.
Exactly like `join-with-stream` but the join is triggered even if there is no record with the same key yet in the other stream for a given time window. In such case, the value provided for this side of the join is nil.
(process stream processor)
(process stream processor options)
Returns a new stream processing the records from the given one using a low-level processor.
For when the high-level API is not enough. It is typically used for an fmap-kv
like behavior involving some state.
Cf. dvlopt.kstreams.topology/add-processor
:dvlopt.kstreams/processor.on-record may be used to explicitly forward records using the associated context.
Marks the topic for repartioning.
Cf. dvlopt.kstreams.builder
A map of options may be given :
:dvlopt.kstreams.store/names
List of state store names previously added to the underlying builder this processor need to access.
Cf. dvlopt.kstreams.builder/add-store
Returns a new stream processing the records from the given one using a low-level processor. For when the high-level API is not enough. It is typically used for an `fmap-kv` like behavior involving some state. Cf. `dvlopt.kstreams.topology/add-processor` :dvlopt.kstreams/processor.on-record may be used to explicitly forward records using the associated context. Marks the topic for repartioning. Cf. `dvlopt.kstreams.builder` A map of options may be given : :dvlopt.kstreams.store/names List of state store names previously added to the underlying builder this processor need to access. Cf. `dvlopt.kstreams.builder/add-store`
(process-values stream processor)
(process-values stream processor options)
Returns a new stream efficiently proccessing the values of the given one using a low-level processor.
Unlike process
, does not mark the resulting stream for repartioning as the keys remain intact. Also, because this
function is about processing values, there is no point forwarding records using the associated context so doing so
will throw an exception. Rather, the library maps the original value of the records to the value returned by
:dvlopt.kstreams/processor.on-record (which may be nil).
It is typically used when some state is needed.
A map of options may be given, just like in process
.
Returns a new stream efficiently proccessing the values of the given one using a low-level processor. Unlike `process`, does not mark the resulting stream for repartioning as the keys remain intact. Also, because this function is about processing values, there is no point forwarding records using the associated context so doing so will throw an exception. Rather, the library maps the original value of the records to the value returned by :dvlopt.kstreams/processor.on-record (which may be nil). It is typically used when some state is needed. A map of options may be given, just like in `process`.
(reduce-sessions session-windowed-stream fn-reduce fn-merge fn-seed)
(reduce-sessions session-windowed-stream fn-reduce fn-merge fn-seed options)
Returns a new table aggregating values for each session of each key for the given session-windowed stream.
Sessions might merge, hence the need for a function being able to do so.
A map of standard table options may be given (cf. dvlopt.kstreams.table
).
Ex. (reduce-sessions grouped-stream (fn reduce [aggregated k v] (+ aggregated v)) (fn merge [aggregated-session-1 aggregated-session-2 k] (+ aggregated-session-1 aggregated-session-2)) (fn seed [] 0))
Returns a new table aggregating values for each session of each key for the given session-windowed stream. Sessions might merge, hence the need for a function being able to do so. A map of standard table options may be given (cf. `dvlopt.kstreams.table`). Ex. (reduce-sessions grouped-stream (fn reduce [aggregated k v] (+ aggregated v)) (fn merge [aggregated-session-1 aggregated-session-2 k] (+ aggregated-session-1 aggregated-session-2)) (fn seed [] 0))
(reduce-values grouped-stream fn-reduce fn-seed)
(reduce-values grouped-stream fn-reduce fn-seed options)
Returns a new table aggregating values for each key of the given grouped stream.
A map of standard table options may be given (cf. dvlopt.kstreams.table
).
Ex. (reduce-values grouped-stream (fn reduce [aggregated k v] (+ aggregated v)) (fn seed [] 0))
Returns a new table aggregating values for each key of the given grouped stream. A map of standard table options may be given (cf. `dvlopt.kstreams.table`). Ex. (reduce-values grouped-stream (fn reduce [aggregated k v] (+ aggregated v)) (fn seed [] 0))
(reduce-windows time-windowed-stream fn-reduce fn-seed)
(reduce-windows time-windowed-stream fn-reduce fn-seed options)
Returns a new table aggregating values for each time window of each key of the given time-windowed stream.
Cf. reduce-values
A map of standard table options may be given (cf. dvlopt.kstreams.table
).
Returns a new table aggregating values for each time window of each key of the given time-windowed stream. Cf. `reduce-values` A map of standard table options may be given (cf. `dvlopt.kstreams.table`).
(sink-do stream f)
Marks the end of the given stream by doing a side-effect for each key-value.
Side effects cannot be tracked by Kafka, hence they do not benefit from Kafka's processing garantees. Returns nil.
Ex. (sink-do stream (fn [k v] (println :key k :value v)))
Marks the end of the given stream by doing a side-effect for each key-value. Side effects cannot be tracked by Kafka, hence they do not benefit from Kafka's processing garantees. Returns nil. Ex. (sink-do stream (fn [k v] (println :key k :value v)))
(sink-process stream processor)
(sink-process stream processor options)
Marks the end of the given stream by processing each record with a low-level processor.
Returns nil.
A map of options may be given, just like in process
.
Marks the end of the given stream by processing each record with a low-level processor. Returns nil. A map of options may be given, just like in `process`.
(sink-topic stream topic)
(sink-topic stream topic options)
Marks the end of the given stream by sending each key-value to a chosen topic.
In this case, topic might be either :
[:always TopicName] Where TopicName is the name of the unique topic the records will always be sent to.
[:select (fn [record])] Where the function is used to dynamically select a topic. All the values in the provided record refer to what is known about the original record sourced from Kafka. As such, even ::topic might be missing.
Returns nil.
A map of options may be given, the same one as for through-topic
.
Marks the end of the given stream by sending each key-value to a chosen topic. In this case, topic might be either : - [:always TopicName] Where TopicName is the name of the unique topic the records will always be sent to. - [:select (fn [record])] Where the function is used to dynamically select a topic. All the values in the provided record refer to what is known about the original record sourced from Kafka. As such, even ::topic might be missing. Returns nil. A map of options may be given, the same one as for `through-topic`.
(through-topic stream topic)
(through-topic stream topic options)
Sends each record of the given stream to a given topic and then continue streaming.
Useful when the stream was marked for repartioning. Streaming to a chosen topic before a stateful operation will avoid automatic repartioning.
A map of options may be given :
:dvlopt.kafka/deserializer.key
:dvlopt.kafka/deserializer.value
:dvlopt.kafka/serializer.key
:dvlopt.kafka/serializer.value
Cf. dvlopt.kafka
for description of serializers and deserializers.
:dvlopt.kstreams/select-partition
Cf. dvlopt.kstreams.builder
Sends each record of the given stream to a given topic and then continue streaming. Useful when the stream was marked for repartioning. Streaming to a chosen topic before a stateful operation will avoid automatic repartioning. A map of options may be given : :dvlopt.kafka/deserializer.key :dvlopt.kafka/deserializer.value :dvlopt.kafka/serializer.key :dvlopt.kafka/serializer.value Cf. `dvlopt.kafka` for description of serializers and deserializers. :dvlopt.kstreams/select-partition Cf. `dvlopt.kstreams.builder`
(window grouped-stream interval)
(window grouped-stream interval options)
Returns a new time windowed stream windowing values by a fixed time interval for each key of the given grouped stream.
A map of options may be given :
:dvlopt.kstreams.store/retention
Cf. dvlopt.kstreams.store
::interval.type Fixed time windows can behave in 3 fashions :
:hopping
Hopping windows of interval I advance by sub-interval J <= I and are aligned to the epoch, meaning they
always start at time 0. The lower bound is inclusive and upper bound is exclusive.
Ex. I = 5 seconds, J = 3 seconds (thus every window overlaps with its neighbor by 2 seconds)
[0;5), [3;8), [6;11), ...
:tumbling (default)
Tumbling windows are simply non-overlapping hopping windows (ie. I = J).
Ex. [0;5), [5;10), [10;15), ...
:sliding
Sliding windows are aligned to the timestamps of the records and not the epoch. Two records are said to
be part of the same sliding window if the difference of their timestamps <= I, where both the lower and
upper bound are inclusive.
Windows of type :hopping accepts the following additional option :
::advance-by The J sub-interval.
Cf. dvlopt.kafka
for description of time intervals.
dvlopt.kstreams.store
for description of time windows.
Ex. ;; Windows of 5 seconds advancing by 3 seconds (thus overlapping 2 seconds).
(window grouped-stream
[5 :seconds]
{::interval.type :hopping
::advance-by [3 :seconds]})
Returns a new time windowed stream windowing values by a fixed time interval for each key of the given grouped stream. A map of options may be given : :dvlopt.kstreams.store/retention Cf. `dvlopt.kstreams.store` ::interval.type Fixed time windows can behave in 3 fashions : :hopping Hopping windows of interval I advance by sub-interval J <= I and are aligned to the epoch, meaning they always start at time 0. The lower bound is inclusive and upper bound is exclusive. Ex. I = 5 seconds, J = 3 seconds (thus every window overlaps with its neighbor by 2 seconds) [0;5), [3;8), [6;11), ... :tumbling (default) Tumbling windows are simply non-overlapping hopping windows (ie. I = J). Ex. [0;5), [5;10), [10;15), ... :sliding Sliding windows are aligned to the timestamps of the records and not the epoch. Two records are said to be part of the same sliding window if the difference of their timestamps <= I, where both the lower and upper bound are inclusive. Windows of type :hopping accepts the following additional option : ::advance-by The J sub-interval. Cf. `dvlopt.kafka` for description of time intervals. `dvlopt.kstreams.store` for description of time windows. Ex. ;; Windows of 5 seconds advancing by 3 seconds (thus overlapping 2 seconds). (window grouped-stream [5 :seconds] {::interval.type :hopping ::advance-by [3 :seconds]})
(window-by-session grouped-stream interval)
(window-by-session grouped-stream interval options)
Returns a new session windowed stream windowing values by sessions for each key of the given grouped stream.
Sessions are non-fixed intervals of activity between fixed intervals of inactivity.
Cf. dvlopt.kafka
for description of time intervals
Cf. dvlopt.kstreams.store
for description of sessions
A map of options may be given :
:dvlopt.kstreams.store/retention
Cf. dvlopt.kstreams.store
Returns a new session windowed stream windowing values by sessions for each key of the given grouped stream. Sessions are non-fixed intervals of activity between fixed intervals of inactivity. Cf. `dvlopt.kafka` for description of time intervals Cf. `dvlopt.kstreams.store` for description of sessions A map of options may be given : :dvlopt.kstreams.store/retention Cf. `dvlopt.kstreams.store`
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close