Liking cljdoc? Tell your friends :D

dvlopt.kstreams.stream

Kafka Streams' abstraction of streams.

Cf. dvlopt.kstreams.builder for the big picture and details

A stream can be transformed by various functions. Those functions returns themselves a new stream representing the transformation. Hence, a single stream can be used for the base of more than one transformation.

The values of a stream can be aggregated. Prior to this, it needs to be grouped by using the group-by or group-by-key function from this namespace. If needed, a grouped stream can then be windowed by fixed time intervals or by sessions. Then, an appropriate reduce-* function can be used to perform the aggregation which always result in a table.

A whole variety of joins between a stream and anything else are available.

Kafka Streams' abstraction of streams.

Cf. `dvlopt.kstreams.builder` for the big picture and details


A stream can be transformed by various functions. Those functions returns themselves a new stream representing the transformation.
Hence, a single stream can be used for the base of more than one transformation.

The values of a stream can be aggregated. Prior to this, it needs to be grouped by using the `group-by` or `group-by-key` function from
this namespace. If needed, a grouped stream can then be windowed by fixed time intervals or by sessions. Then, an appropriate `reduce-*`
function can be used to perform the aggregation which always result in a table.

A whole variety of joins between a stream and anything else are available.
raw docstring

branchclj

(branch stream predicates)

Given a list of predicate functions, returns a vector of new corresponding streams such that as soon as a key-value from the original stream returns true on a predicate, it is sent to the corresponding stream (and only that one).

A record is dropped if it matches no predicate.

Ex. ;; Divives a stream into 3 new streams containing respectively :red values, :black ones, and any other ones.

(branch stream
        [(fn only-red [k v]
           (= v
              :red))
         (fn only-black [k v]
           (= v
              :black))
         (fn other [k v]
           true)])
Given a list of predicate functions, returns a vector of new corresponding streams such that as soon as a key-value from
the original stream returns true on a predicate, it is sent to the corresponding stream (and only that one).

A record is dropped if it matches no predicate.


Ex. ;; Divives a stream into 3 new streams containing respectively :red values, :black ones, and any other ones.

    (branch stream
            [(fn only-red [k v]
               (= v
                  :red))
             (fn only-black [k v]
               (= v
                  :black))
             (fn other [k v]
               true)])
sourceraw docstring

do-kvclj

(do-kv stream f)

For each key-value, do a side-effect and then continue streaming.

Side effects cannot be tracked by Kafka, hence they do not benefit from Kafka's processing garantees.

Ex. (do-kv stream (fn [k v] (println :key k :value v)))

For each key-value, do a side-effect and then continue streaming.

Side effects cannot be tracked by Kafka, hence they do not benefit from Kafka's processing garantees.


Ex. (do-kv stream
           (fn [k v]
             (println :key k :value v)))
sourceraw docstring

filter-kvclj

(filter-kv stream predicate)

Returns a new stream filtering out key-values from the given one.

Ex. ;; Keeps values greater than or equal to 42.

(filter-kv stream
           (fn [k v]
             (>= v
                 42)))
Returns a new stream filtering out key-values from the given one.


Ex. ;; Keeps values greater than or equal to 42.

    (filter-kv stream
               (fn [k v]
                 (>= v
                     42)))
sourceraw docstring

fmap-kvclj

(fmap-kv kstream f)

Returns a new stream mapping key-values from the given one into a collection of [key value]'s (or nil).

Marks the resulting stream for repartioning. Cf. dvlopt.kstreams.builder

Ex. ;; The value is a list of tokens and we want to count them individually so that we end up with a collection where the key is a unique token and the value is its count.

(fmap-kv stream
         (fn [k v]
           (reduce (fn [hmap token]
                     (assoc hmap
                            token
                            (inc (get hmap
                                      token
                                      0))))
                   {}
                   v)))
Returns a new stream mapping key-values from the given one into a collection of [key value]'s (or nil).

Marks the resulting stream for repartioning.
Cf. `dvlopt.kstreams.builder`


Ex. ;; The value is a list of tokens and we want to count them individually so that we end up with
       a collection where the key is a unique token and the value is its count.

    (fmap-kv stream
             (fn [k v]
               (reduce (fn [hmap token]
                         (assoc hmap
                                token
                                (inc (get hmap
                                          token
                                          0))))
                       {}
                       v)))
sourceraw docstring

fmap-valuesclj

(fmap-values kstream f)

Returns a new stream mapping values from the given one into a collection of values (or nil).

Unlike fmap-kv, does not mark the resulting stream for repartioning as the keys remain intact.

Ex. ;; The value is a list we want to split into individual items while filtering out nils.

(fmap-values stream
             (fn [k v]
               (filter some?
                       v)))
Returns a new stream mapping values from the given one into a collection of values (or nil).

Unlike `fmap-kv`, does not mark the resulting stream for repartioning as the keys remain intact.


Ex. ;; The value is a list we want to split into individual items while filtering out nils.

    (fmap-values stream
                 (fn [k v]
                   (filter some?
                           v)))
sourceraw docstring

group-byclj

(group-by stream f)
(group-by kstream f options)

Returns a new grouped stream grouping values based on a chosen key.

Drops records leading to a nil chosen key.

Because a new key is explicitly selected, the data is repartioned. Cf. dvlopt.kstreams.builder about repartioning.

A map of options may be given :

:dvlopt.kafka/deserializer.key :dvlopt.kafka/deserializer.value :dvlopt.kafka/serializer.key :dvlopt.kafka/serializer.value Cf. dvlopt.kafka for description of deserializers

:dvlopt.kstreams/repartition-name Cf. dvlopt.kstreams.builder section "State and repartitioning"

Ex. (group-by stream (fn by-country [k v] (:country v)))

Returns a new grouped stream grouping values based on a chosen key.

Drops records leading to a nil chosen key.

Because a new key is explicitly selected, the data is repartioned.
Cf. `dvlopt.kstreams.builder` about repartioning.

A map of options may be given :

  :dvlopt.kafka/deserializer.key
  :dvlopt.kafka/deserializer.value
  :dvlopt.kafka/serializer.key
  :dvlopt.kafka/serializer.value
   Cf. `dvlopt.kafka` for description of deserializers

  :dvlopt.kstreams/repartition-name
   Cf. `dvlopt.kstreams.builder` section "State and repartitioning"


Ex. (group-by stream
              (fn by-country [k v]
                (:country v)))
sourceraw docstring

group-by-keyclj

(group-by-key stream)
(group-by-key stream options)

Exactly like group-by but groups directly by the original key than a selected one.

Unless the stream was marked for repartioning, this function will not repartition it as keys remain intact.

Exactly like `group-by` but groups directly by the original key than a selected one.

Unless the stream was marked for repartioning, this function will not repartition it as keys remain intact.
sourceraw docstring

join-with-global-tableclj

(join-with-global-table left-stream right-global-table f-map-k f-join)

Similar to join-with-table but offers the following :

  • Data need not to be co-partitioned.
  • More efficient when a regular table is skewed (some partitions are heavily populated, some not).
  • Typically more efficient that performing several joins in order to reach the same result.
  • Allows for joining on a different key.

Each record of the input stream is mapped to a new key which triggers a join if the global table contains that mapped key. The value resulting from the join is associated with the key of the original record.

Records from the input stream with a nil key or value are ignored. Records from the global table with a nil value removes the corresponding key from this global table.

The input stream is repartitioned if it was marked for repartitioning.

Ex. ;; Enrich a stream of users by adding the most popular song of the country they are from.

(join-with-global-table stream
                        global-table
                        (fn map-k [k v]
                          (:country v))
                        (fn join [v-stream v-global-table]
                          (assoc v-stream
                                 :song
                                 (first (:top-10 v-global-table)))))
Similar to `join-with-table` but offers the following :

  - Data need not to be co-partitioned.
  - More efficient when a regular table is skewed (some partitions are heavily populated, some not).
  - Typically more efficient that performing several joins in order to reach the same result.
  - Allows for joining on a different key.

Each record of the input stream is mapped to a new key which triggers a join if the global table contains that mapped
key. The value resulting from the join is associated with the key of the original record.

Records from the input stream with a nil key or value are ignored. Records from the global table with a nil value
removes the corresponding key from this global table.

The input stream is repartitioned if it was marked for repartitioning.


Ex. ;; Enrich a stream of users by adding the most popular song of the country they are from.

    (join-with-global-table stream
                            global-table
                            (fn map-k [k v]
                              (:country v))
                            (fn join [v-stream v-global-table]
                              (assoc v-stream
                                     :song
                                     (first (:top-10 v-global-table)))))
sourceraw docstring

join-with-streamclj

(join-with-stream left-stream right-stream f interval)
(join-with-stream left-stream right-stream f interval options)

Returns a new stream inner joining values from both given streams every time :

a) They shares the same key.

b) The difference in the timestamps of both records is <= the given interval. Cf. dvlopt.kafka for descriptions of time intervals.

Hence, for the same key, several joins might happen within a single interval.

Records with a nil key or nil value are ignored.

Input streams are repartitioned if they were marked for repartitioning and a store will be generated for each of them.

Cf. dvlopt.kstreams.builder for requirements related to joins

A map of options may be given :

:dvlopt.kafka/deserializer.key :dvlopt.kafka/serializer.key

:dvlopt.kstreams/left :dvlopt.kstreams/right Maps which may contain value ser/de :

 :dvlopt.kafka/deserializer.value
 :dvlopt.kafka/serializer.value

Cf. dvlopt.kafka for description of serializers and deserializers.

:dvlopt.kstreams.store/retention Cf. dvlopt.kstreams.store

Ex. (join-with-stream left-stream right-stream (fn [v-left v-right] (str "left = " v-left "and right = " v-right)) [2 :seconds])

Returns a new stream inner joining values from both given streams every time :

  a) They shares the same key.

  b) The difference in the timestamps of both records is <= the given interval.
     Cf. `dvlopt.kafka` for descriptions of time intervals.

Hence, for the same key, several joins might happen within a single interval.

Records with a nil key or nil value are ignored.

Input streams are repartitioned if they were marked for repartitioning and a store will be generated for
each of them.

Cf. `dvlopt.kstreams.builder` for requirements related to joins


A map of options may be given :

  :dvlopt.kafka/deserializer.key
  :dvlopt.kafka/serializer.key

  :dvlopt.kstreams/left
  :dvlopt.kstreams/right
   Maps which may contain value ser/de :

     :dvlopt.kafka/deserializer.value
     :dvlopt.kafka/serializer.value

   Cf. `dvlopt.kafka` for description of serializers and deserializers.

  :dvlopt.kstreams.store/retention
   Cf. `dvlopt.kstreams.store`


Ex. (join-with-stream left-stream
                      right-stream
                      (fn [v-left v-right]
                        (str "left = " v-left "and right = " v-right))
                      [2 :seconds])
sourceraw docstring

join-with-tableclj

(join-with-table left-stream right-table f)
(join-with-table left-stream right-table f options)

Returns a new stream inner joining the given stream with the given table. The join is triggered everytime a new record arrives in the stream and the table contains the key of this record. This is very useful for enriching a stream with some up-to-date information.

Records from the input stream with a nil key or value are ignored. Records from the input table with a nil value removes the corresponding key from this table.

The input stream is repartioned if it was marked for repartioning.

A map of options may be given, just like in join-with-stream.

Ex. (join-with-table stream table (fn [v-stream v-table] (assoc v-stream :last-known-location (:location v-table))))

Returns a new stream inner joining the given stream with the given table. The join is triggered everytime
a new record arrives in the stream and the table contains the key of this record. This is very useful for
enriching a stream with some up-to-date information.

Records from the input stream with a nil key or value are ignored. Records from the input table with a nil
value removes the corresponding key from this table.

The input stream is repartioned if it was marked for repartioning.


A map of options may be given, just like in `join-with-stream`. 


Ex. (join-with-table stream
                     table
                     (fn [v-stream v-table]
                       (assoc v-stream
                              :last-known-location
                              (:location v-table))))
sourceraw docstring

left-join-with-global-tableclj

(left-join-with-global-table left-stream right-global-table f-map-k f-join)

Exactly like join-with-global-table but the join is triggered even if the global table does not contain the mapped key. In such case, the right value supplied during the join is nil.

Exactly like `join-with-global-table` but the join is triggered even if the global table does not contain the mapped
key. In such case, the right value supplied during the join is nil.
sourceraw docstring

left-join-with-streamclj

(left-join-with-stream left-stream right-stream f interval)
(left-join-with-stream left-stream right-stream f interval options)

Exactly like join-with-stream but the join is triggered even if there is no record with the same key yet in the right stream for a given time window. In such case, the right value provided for the join is nil.

Exactly like `join-with-stream` but the join is triggered even if there is no record with the same key yet in
the right stream for a given time window. In such case, the right value provided for the join is nil.
sourceraw docstring

left-join-with-tableclj

(left-join-with-table left-stream right-table f)
(left-join-with-table left-stream right-table f options)

Exactly like join-with-table but the join is triggered even if the table does contain the key. In such case, the right value supplied during the join in nil.

Exactly like `join-with-table` but the join is triggered even if the table does contain the key. In such case, the
right value supplied during the join in nil.
sourceraw docstring

map-keysclj

(map-keys stream f)

Returns a new stream efficiently mapping keys from the given one.

Marks the resulting stream for repartioning. Cf. dvlopt.kstreams.builder

Ex. ;; The key is an ip address mapped to a country.

(map-keys stream
          (fn [k v]
            (country-from-ip k)))
Returns a new stream efficiently mapping keys from the given one.

Marks the resulting stream for repartioning.
Cf. `dvlopt.kstreams.builder`


Ex. ;; The key is an ip address mapped to a country.

    (map-keys stream
              (fn [k v]
                (country-from-ip k)))
sourceraw docstring

map-kvclj

(map-kv stream f)

Returns a new stream mapping key-values from the given one.

Marks the resulting stream for repartioning. Cf. dvlopt.kstreams.builder

Ex. ;; The key is an ip address mapped to a country and the value is a collection mapped to its number of items.

(map-kv stream
        (fn [k v]
          [(country-from-ip k)
           (count v)]))
Returns a new stream mapping key-values from the given one.

Marks the resulting stream for repartioning.
Cf. `dvlopt.kstreams.builder`


Ex. ;; The key is an ip address mapped to a country and the value is a collection mapped to its
       number of items.

    (map-kv stream
            (fn [k v]
              [(country-from-ip k)
               (count v)]))
sourceraw docstring

map-valuesclj

(map-values kstream f)

Returns a new stream efficiently mapping values from the given one.

Unlike other mapping functions, does not mark the resulting stream for repartioning as the keys remain intact.

Ex. ;; The value is a collection mapped to its number of items.

(map-values stream
            (fn [k v]
              (count v)))
Returns a new stream efficiently mapping values from the given one.

Unlike other mapping functions, does not mark the resulting stream for repartioning as the keys remain intact.


Ex. ;; The value is a collection mapped to its number of items.

    (map-values stream
                (fn [k v]
                  (count v)))
sourceraw docstring

merge-streamclj

(merge-stream stream other-stream)

Returns a new stream merging the two given ones.

There is no guarantee about the ordering of the records between streams. However, order is preserved for each stream.

Returns a new stream merging the two given ones.

There is no guarantee about the ordering of the records between streams. However, order is preserved for each stream.
sourceraw docstring

outer-join-with-streamclj

(outer-join-with-stream left-stream right-stream f interval)
(outer-join-with-stream left-stream right-stream f interval options)

Exactly like join-with-stream but the join is triggered even if there is no record with the same key yet in the other stream for a given time window. In such case, the value provided for this side of the join is nil.

Exactly like `join-with-stream` but the join is triggered even if there is no record with the same key yet in
the other stream for a given time window. In such case, the value provided for this side of the join is nil.
sourceraw docstring

processclj

(process stream processor)
(process stream processor options)

Returns a new stream processing the records from the given one using a low-level processor.

For when the high-level API is not enough. It is typically used for an fmap-kv like behavior involving some state.

Cf. dvlopt.kstreams.topology/add-processor :dvlopt.kstreams/processor.on-record may be used to explicitly forward records using the associated context.

Marks the topic for repartioning. Cf. dvlopt.kstreams.builder

A map of options may be given :

:dvlopt.kstreams.store/names List of state store names previously added to the underlying builder this processor need to access. Cf. dvlopt.kstreams.builder/add-store

Returns a new stream processing the records from the given one using a low-level processor.

For when the high-level API is not enough. It is typically used for an `fmap-kv` like behavior involving some state.


Cf. `dvlopt.kstreams.topology/add-processor`
    :dvlopt.kstreams/processor.on-record may be used to explicitly forward records using the associated context.

Marks the topic for repartioning.
Cf. `dvlopt.kstreams.builder`


A map of options may be given :

  :dvlopt.kstreams.store/names
   List of state store names previously added to the underlying builder this processor need to access.
   Cf. `dvlopt.kstreams.builder/add-store`
sourceraw docstring

process-valuesclj

(process-values stream processor)
(process-values stream processor options)

Returns a new stream efficiently proccessing the values of the given one using a low-level processor.

Unlike process, does not mark the resulting stream for repartioning as the keys remain intact. Also, because this function is about processing values, there is no point forwarding records using the associated context so doing so will throw an exception. Rather, the library maps the original value of the records to the value returned by :dvlopt.kstreams/processor.on-record (which may be nil).

It is typically used when some state is needed.

A map of options may be given, just like in process.

Returns a new stream efficiently proccessing the values of the given one using a low-level processor.

Unlike `process`, does not mark the resulting stream for repartioning as the keys remain intact. Also, because this
function is about processing values, there is no point forwarding records using the associated context so doing so
will throw an exception. Rather, the library maps the original value of the records to the value returned by
:dvlopt.kstreams/processor.on-record (which may be nil).

It is typically used when some state is needed.


A map of options may be given, just like in `process`.
sourceraw docstring

reduce-sessionsclj

(reduce-sessions session-windowed-stream fn-reduce fn-merge fn-seed)
(reduce-sessions session-windowed-stream fn-reduce fn-merge fn-seed options)

Returns a new table aggregating values for each session of each key for the given session-windowed stream.

Sessions might merge, hence the need for a function being able to do so.

A map of standard table options may be given (cf. dvlopt.kstreams.table).

Ex. (reduce-sessions grouped-stream (fn reduce [aggregated k v] (+ aggregated v)) (fn merge [aggregated-session-1 aggregated-session-2 k] (+ aggregated-session-1 aggregated-session-2)) (fn seed [] 0))

Returns a new table aggregating values for each session of each key for the given session-windowed stream.

Sessions might merge, hence the need for a function being able to do so.


A map of standard table options may be given (cf. `dvlopt.kstreams.table`).


Ex. (reduce-sessions grouped-stream
                     (fn reduce [aggregated k v]
                       (+ aggregated
                          v))
                     (fn merge [aggregated-session-1 aggregated-session-2 k]
                       (+ aggregated-session-1
                          aggregated-session-2))
                     (fn seed []
                       0))
sourceraw docstring

reduce-valuesclj

(reduce-values grouped-stream fn-reduce fn-seed)
(reduce-values grouped-stream fn-reduce fn-seed options)

Returns a new table aggregating values for each key of the given grouped stream.

A map of standard table options may be given (cf. dvlopt.kstreams.table).

Ex. (reduce-values grouped-stream (fn reduce [aggregated k v] (+ aggregated v)) (fn seed [] 0))

Returns a new table aggregating values for each key of the given grouped stream.


A map of standard table options may be given (cf. `dvlopt.kstreams.table`).


Ex. (reduce-values grouped-stream
                   (fn reduce [aggregated k v]
                     (+ aggregated
                        v))
                   (fn seed []
                     0))
sourceraw docstring

reduce-windowsclj

(reduce-windows time-windowed-stream fn-reduce fn-seed)
(reduce-windows time-windowed-stream fn-reduce fn-seed options)

Returns a new table aggregating values for each time window of each key of the given time-windowed stream. Cf. reduce-values

A map of standard table options may be given (cf. dvlopt.kstreams.table).

Returns a new table aggregating values for each time window of each key of the given time-windowed stream.
Cf. `reduce-values`


A map of standard table options may be given (cf. `dvlopt.kstreams.table`).
sourceraw docstring

sink-doclj

(sink-do stream f)

Marks the end of the given stream by doing a side-effect for each key-value.

Side effects cannot be tracked by Kafka, hence they do not benefit from Kafka's processing garantees. Returns nil.

Ex. (sink-do stream (fn [k v] (println :key k :value v)))

Marks the end of the given stream by doing a side-effect for each key-value.

Side effects cannot be tracked by Kafka, hence they do not benefit from Kafka's processing garantees.
Returns nil.


Ex. (sink-do stream
             (fn [k v]
               (println :key k :value v)))
sourceraw docstring

sink-processclj

(sink-process stream processor)
(sink-process stream processor options)

Marks the end of the given stream by processing each record with a low-level processor.

Returns nil.

A map of options may be given, just like in process.

Marks the end of the given stream by processing each record with a low-level processor.

Returns nil.


A map of options may be given, just like in `process`.
sourceraw docstring

sink-topicclj

(sink-topic stream topic)
(sink-topic stream topic options)

Marks the end of the given stream by sending each key-value to a chosen topic.

In this case, topic might be either :

  • [:always TopicName] Where TopicName is the name of the unique topic the records will always be sent to.

  • [:select (fn [record])] Where the function is used to dynamically select a topic. All the values in the provided record refer to what is known about the original record sourced from Kafka. As such, even ::topic might be missing.

Returns nil.

A map of options may be given, the same one as for through-topic.

Marks the end of the given stream by sending each key-value to a chosen topic.

In this case, topic might be either :

  - [:always TopicName]
    Where TopicName is the name of the unique topic the records will always be sent to.

  - [:select (fn [record])]
    Where the function is used to dynamically select a topic. All the values in the provided record refer
    to what is known about the original record sourced from Kafka. As such, even ::topic might be missing.

Returns nil.


A map of options may be given, the same one as for `through-topic`.
sourceraw docstring

through-topicclj

(through-topic stream topic)
(through-topic stream topic options)

Sends each record of the given stream to a given topic and then continue streaming.

Useful when the stream was marked for repartioning. Streaming to a chosen topic before a stateful operation will avoid automatic repartioning.

A map of options may be given :

:dvlopt.kafka/deserializer.key :dvlopt.kafka/deserializer.value :dvlopt.kafka/serializer.key :dvlopt.kafka/serializer.value Cf. dvlopt.kafka for description of serializers and deserializers.

:dvlopt.kstreams/select-partition Cf. dvlopt.kstreams.builder

Sends each record of the given stream to a given topic and then continue streaming.

Useful when the stream was marked for repartioning. Streaming to a chosen topic before a stateful operation will avoid
automatic repartioning.

A map of options may be given :

  :dvlopt.kafka/deserializer.key
  :dvlopt.kafka/deserializer.value
  :dvlopt.kafka/serializer.key
  :dvlopt.kafka/serializer.value
   Cf. `dvlopt.kafka` for description of serializers and deserializers.

  :dvlopt.kstreams/select-partition
   Cf. `dvlopt.kstreams.builder`
sourceraw docstring

windowclj

(window grouped-stream interval)
(window grouped-stream interval options)

Returns a new time windowed stream windowing values by a fixed time interval for each key of the given grouped stream.

A map of options may be given :

:dvlopt.kstreams.store/retention Cf. dvlopt.kstreams.store

::interval.type Fixed time windows can behave in 3 fashions :

 :hopping
  Hopping windows of interval I advance by sub-interval J <= I and are aligned to the epoch, meaning they
  always start at time 0. The lower bound is inclusive and upper bound is exclusive. 
    
    Ex. I = 5 seconds, J = 3 seconds (thus every window overlaps with its neighbor by 2 seconds)

        [0;5), [3;8), [6;11), ...

 :tumbling (default)
  Tumbling windows are simply non-overlapping hopping windows (ie. I = J).

    Ex. [0;5), [5;10), [10;15), ...

 :sliding
  Sliding windows are aligned to the timestamps of the records and not the epoch. Two records are said to
  be part of the same sliding window if the difference of their timestamps <= I, where both the lower and
  upper bound are inclusive.

Windows of type :hopping accepts the following additional option :

::advance-by The J sub-interval.

Cf. dvlopt.kafka for description of time intervals. dvlopt.kstreams.store for description of time windows.

Ex. ;; Windows of 5 seconds advancing by 3 seconds (thus overlapping 2 seconds).

(window grouped-stream
        [5 :seconds]
        {::interval.type :hopping
         ::advance-by    [3 :seconds]})
Returns a new time windowed stream windowing values by a fixed time interval for each key of the given grouped stream.


A map of options may be given :

  :dvlopt.kstreams.store/retention
   Cf. `dvlopt.kstreams.store`

  ::interval.type
   Fixed time windows can behave in 3 fashions :

     :hopping
      Hopping windows of interval I advance by sub-interval J <= I and are aligned to the epoch, meaning they
      always start at time 0. The lower bound is inclusive and upper bound is exclusive. 
        
        Ex. I = 5 seconds, J = 3 seconds (thus every window overlaps with its neighbor by 2 seconds)

            [0;5), [3;8), [6;11), ...

     :tumbling (default)
      Tumbling windows are simply non-overlapping hopping windows (ie. I = J).

        Ex. [0;5), [5;10), [10;15), ...

     :sliding
      Sliding windows are aligned to the timestamps of the records and not the epoch. Two records are said to
      be part of the same sliding window if the difference of their timestamps <= I, where both the lower and
      upper bound are inclusive.

Windows of type :hopping accepts the following additional option :

  ::advance-by 
   The J sub-interval.


Cf. `dvlopt.kafka` for description of time intervals.
    `dvlopt.kstreams.store` for description of time windows.


Ex. ;; Windows of 5 seconds advancing by 3 seconds (thus overlapping 2 seconds).

    (window grouped-stream
            [5 :seconds]
            {::interval.type :hopping
             ::advance-by    [3 :seconds]})
sourceraw docstring

window-by-sessionclj

(window-by-session grouped-stream interval)
(window-by-session grouped-stream interval options)

Returns a new session windowed stream windowing values by sessions for each key of the given grouped stream.

Sessions are non-fixed intervals of activity between fixed intervals of inactivity.

Cf. dvlopt.kafka for description of time intervals Cf. dvlopt.kstreams.store for description of sessions

A map of options may be given :

:dvlopt.kstreams.store/retention Cf. dvlopt.kstreams.store

Returns a new session windowed stream windowing values by sessions for each key of the given grouped stream.

Sessions are non-fixed intervals of activity between fixed intervals of inactivity.

Cf. `dvlopt.kafka` for description of time intervals
Cf. `dvlopt.kstreams.store` for description of sessions


A map of options may be given :

  :dvlopt.kstreams.store/retention
   Cf. `dvlopt.kstreams.store`
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close