Liking cljdoc? Tell your friends :D

promisespromises.stream.cross-impl


->finalizer-fnclj/s

(->finalizer-fn {finalizer :promisespromises.stream.cross/finalizer
                 :as _cross-spec})
source

->key-comparator-fnclj/s

(->key-comparator-fn {key-comparator
                        :promisespromises.stream.cross/key-comparator
                      :as _cross-spec})
source

->key-extractor-fnclj/s

(->key-extractor-fn key-spec)

given a key-spec, return a key-extractor fn

given a key-spec, return a key-extractor fn
sourceraw docstring

->key-extractor-fnsclj/s

(->key-extractor-fns {keyspecs :promisespromises.stream.cross/keys
                      :as _cross-spec})
source

->merge-fnclj/s

(->merge-fn {op :promisespromises.stream.cross/op :as cross-spec})
source

->product-sort-fnclj/s

(->product-sort-fn {product-sort :promisespromises.stream.cross/product-sort
                    :as _cross-spec})
source

->select-fnclj/s

(->select-fn {op :promisespromises.stream.cross/op :as _cross-spec})
source

buffer-chunk!clj/s

(buffer-chunk!
  partition-buffer
  {key-comparator-fn :promisespromises.stream.cross/key-comparator-fn
   key-extractor-fns :promisespromises.stream.cross/key-extractor-fns
   :as cross-spec}
  stream-id
  stream)

given a stream of chunks of partitions, and a partition-buffer of [key partition] tuples, retrieve another chunk of partitions and add them to the partition-buffer (or add the keyword ::drained if the end of the stream is reached, or ::errored if the stream errored)

returns Promise<[ [<partition-key> <partition>]* ::drained?]

given a stream of chunks of partitions, and a
partition-buffer of [key partition] tuples, retrieve another
chunk of partitions and add them to the partition-buffer
(or add the keyword ::drained if the end of the
 stream is reached, or ::errored if the stream errored)

returns Promise<[ [<partition-key> <partition>]* ::drained?]
sourceraw docstring

chunk-full?clj/s

(chunk-full? chunk-builder
             {target-chunk-size :promisespromises.stream.cross/target-chunk-size
              :as _cross-spec})

should the current chunk be wrapped?

should the current chunk be wrapped?
sourceraw docstring

chunk-not-empty?clj/s

(chunk-not-empty? chunk-builder)
source

configure-cross-opclj/s

(configure-cross-op cross-spec)

assemble helper functions to allow the core cross-stream* impl to perform the specified operation

assemble helper functions to allow the core cross-stream* impl
to perform the specified operation
sourceraw docstring

ConfiguredCrossOperationclj/s

source

crossclj/s

(cross cross-spec id-streams)

cross some sorted streams, returning a stream according to the cross-spec

each input stream must be sorted ascending in the key specified in cross-spec at [::stream.cross/keys <stream-id>] with the comparator fn from ::stream.cross/comparator

  • cross-spec : a description of the operation to cross the streams
  • id-streams : {<stream-id> <stream>}

e.g. this invocations inner-joins a stream of users, sorted by :org-id, to a stream of orgs, sorted by :id

(cross {::stream.cross/keys {:users :org-id :orgs :id} ::stream.cross/op ::stream.cross/inner-join} {:users <users-stream> :orgs <orgs-stream>})

cross some sorted streams, returning a stream according to the cross-spec

each input stream must be sorted ascending in the key specified in cross-spec
at
  [::stream.cross/keys <stream-id>]
with the comparator fn from ::stream.cross/comparator

- cross-spec : a description of the operation to cross the streams
- id-streams : {<stream-id> <stream>}

e.g. this invocations inner-joins a stream of users, sorted by :org-id, to a
  stream of orgs, sorted by :id

(cross
   {::stream.cross/keys {:users :org-id :orgs :id}
    ::stream.cross/op ::stream.cross/inner-join}
   {:users <users-stream>
    :orgs <orgs-stream>})
sourceraw docstring

cross*clj/s

(cross* cross-spec id-streams)

the implementation, which relies on the support functions:

  • select-fn - select from partitions with matching keys
  • merge-fn - merge records from multiple streams with matching keys,
  • product-sort-fn - sort a merged cartesian product of records with matching keys from multiple streams
  • key-comparator-fn - compare keys, like compare
  • key-extractor-fns - extract a key from a value on a stream

and proceeds iteratively like so:

  • fill any partition buffers requiring it
  • find the minimum key-value from all the lead partitions
  • use the select-fn to select from the lead partitions with the minimum-key-value: [[<stream-id> <partition>]+], taking only the selected partitions from their respective buffers
  • generate a cartesian product from the selected [[<stream-id> <partition>]+] partitions
  • merge the records from each row of the cartesian product - i.e. one record from each stream
  • sort the resulting list of merged records with the product-sort-fn
  • add the sorted list of merged records to the current chunk
  • output the chunk if it's full
the implementation, which relies on the support functions:

 - select-fn - select from partitions with matching keys
 - merge-fn - merge records from multiple streams with matching keys,
 - product-sort-fn - sort a merged cartesian product of records with
     matching keys from multiple streams
 - key-comparator-fn - compare keys, like `compare`
 - key-extractor-fns - extract a key from a value on a stream

and proceeds iteratively like so:

- fill any partition buffers requiring it
- find the minimum key-value from all the lead partitions
- use the select-fn to select from the lead partitions with the
  minimum-key-value: [[<stream-id> <partition>]+], taking only the
  selected partitions from their respective buffers
- generate a cartesian product from the selected [[<stream-id> <partition>]+]
  partitions
- merge the records from each row of the cartesian product - i.e. one record
  from each stream
- sort the resulting list of merged records with the product-sort-fn
- add the sorted list of merged records to the current chunk
- output the chunk if it's full
  
sourceraw docstring

cross-finished?clj/s

(cross-finished? id-partition-buffers)
source

cross-input-errored?clj/s

(cross-input-errored? id-partition-buffers)
source

CrossSpecclj/s

source

CrossStreamsOpclj/s

source

CrossSupportFnsclj/s

the fns which implement cross operation behaviours, all derived from the config in CrossSpec and defaults

the fns which implement cross operation behaviours, all derived from the
config in CrossSpec and defaults
sourceraw docstring

default-target-chunk-sizeclj/s

source

fill-partition-buffers!clj/s

(fill-partition-buffers! id-partition-buffers cross-spec id-streams)

buffer another chunk from any streams which are down to a single partition and have not yet been stream-finished-drained-marker

buffer another chunk from any streams which are down to a single
partition and have not yet been stream-finished-drained-marker
sourceraw docstring

first-cross-input-errorclj/s

(first-cross-input-error id-partition-buffers)

use the first input error for an output error

use the first input error for an output error
sourceraw docstring

generate-outputclj/s

(generate-output {merge-fn :promisespromises.stream.cross/merge-fn
                  product-sort-fn :promisespromises.stream.cross/product-sort-fn
                  finalizer-fn :promisespromises.stream.cross/finalizer-fn
                  :as _cross-spec}
                 selected-id-partitions)

given partition-selections, cartesion-product the selected partitions, merging each row into a {<stream-id> <val>} map, and applying the merge-fn and any finalizer

given partition-selections, cartesion-product the selected partitions,
merging each row into a {<stream-id> <val>} map, and applying the
merge-fn and any finalizer
sourceraw docstring

IdStreamsclj/s

id->stream mappings, either in a map, or a list of pairs - the latter providing order for operations like n-left-join which require it

id->stream mappings, either in a map, or a
list of pairs - the latter providing order for
operations like n-left-join which require it
sourceraw docstring

init-partition-buffers!clj/s

(init-partition-buffers! cross-spec id-streams)

returns partition buffers for each stream with partitions from the first chunk

returns partition buffers for each stream with
partitions from the first chunk
sourceraw docstring

KeySpecclj/s

source

make-merge-differenceclj/s

(make-merge-difference {kxfns :promisespromises.stream.cross/key-extractor-fns
                        :as _cross-spec})
source

make-merge-inner-joinclj/s

(make-merge-inner-join {kxfns :promisespromises.stream.cross/key-extractor-fns
                        :as _cross-spec})
source

make-merge-intersectclj/s

(make-merge-intersect {kxfns :promisespromises.stream.cross/key-extractor-fns
                       :as _cross-spec})
source

make-merge-n-left-joinclj/s

(make-merge-n-left-join {kxfns :promisespromises.stream.cross/key-extractor-fns
                         n :promisespromises.stream.cross.op.n-left-join/n
                         :as _cross-spec})
source

merge-sorted-mergeclj/s

(merge-sorted-merge m)
source

min-key-valclj/s

(min-key-val key-comparator-fn ks)

uses the comparator to find the minimum key value from ks

uses the comparator to find the minimum key value from ks
sourceraw docstring

next-selectionsclj/s

(next-selections {select-fn :promisespromises.stream.cross/select-fn
                  key-comparator-fn
                    :promisespromises.stream.cross/key-comparator-fn
                  :as _cross-spec}
                 id-partition-buffers)

select partitions for the operation return [[[<stream-id> <partition>]+] updated-id-partition-buffers]

select partitions for the operation
return [[[<stream-id> <partition>]+] updated-id-partition-buffers]
sourceraw docstring

OrderedKeySpecsclj/s

source

partition-buffer-content-drained?clj/s

(partition-buffer-content-drained? partition-buffer)

returns true when a partition-buffer has no more content and the associated stream is finished (drained or errored)

returns true when a partition-buffer has no more content
and the associated stream is finished (drained or errored)
sourceraw docstring

partition-buffer-errored?clj/s

(partition-buffer-errored? partition-buffer)

returns true when a partition-buffer has no more content and the associated stream errored

returns true when a partition-buffer has no more content
and the associated stream errored
sourceraw docstring

partition-buffer-needs-filling?clj/s

(partition-buffer-needs-filling? stream-id partition-buffer)

don't wait until empty

don't wait until empty 
sourceraw docstring

partition-streamclj/s

(partition-stream {target-chunk-size
                     :promisespromises.stream.cross/target-chunk-size
                   kxfns :promisespromises.stream.cross/key-extractor-fns
                   :as _cross-spec}
                  stream-id
                  stream)
source

partition-streamsclj/s

(partition-streams {kxfns :promisespromises.stream.cross/key-extractor-fns
                    :as cross-spec}
                   id-streams)

returns a linked/map with {<stream-id> <partitioned-stream>}, and in the same order as specifed in the ::stream.cross/keys config

returns a linked/map with {<stream-id> <partitioned-stream>}, and
in the same order as specifed in the ::stream.cross/keys config
sourceraw docstring

select-allclj/s

(select-all id-partitions)

select-fn which takes all offered id-partitions

select-fn which takes all offered id-partitions
sourceraw docstring

select-firstclj/s

(select-first id-partitions)

select-fn which takes the first id-partition from the offered list of id-partitions

select-fn which takes the first id-partition from the offered
list of id-partitions
sourceraw docstring

set-select-allclj/s

(set-select-all id-partitions)

select-fn which takes all offered id-partitions and additionlly checks that no partition has more than a single element (as required of a set)

select-fn which takes all offered id-partitions and additionlly checks
that no partition has more than a single element (as required of a set)
sourceraw docstring

stream-finished-drained-markerclj/s

source

stream-finished-errored-markerclj/s

source

stream-finished-markersclj/s

the partition buffer marker values indicating that a stream is finished

the partition buffer marker values indicating that
a stream is finished
sourceraw docstring

stream-finished?clj/s

(stream-finished? partition-buffer)

takes a partition-buffer and returns true if there are no more values to consumer from the corresponding stream

takes a partition-buffer and returns true
if there are no more values to consumer from the
corresponding stream
sourceraw docstring

values-sorted?clj/s

(values-sorted? comparator-fn vs)

returns true if vs are sorted according to comparator-fn

returns true if vs are sorted according to comparator-fn
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close