(->key-comparator-fn {key-comparator :prpr3.stream.cross/key-comparator
:as _cross-spec})
(->key-extractor-fn key-spec)
given a key-spec, return a key-extractor fn
given a key-spec, return a key-extractor fn
(->key-extractor-fns {keyspecs :prpr3.stream.cross/keys :as _cross-spec})
(->product-sort-fn {product-sort :prpr3.stream.cross/product-sort
:as _cross-spec})
(buffer-chunk! partition-buffer
{key-comparator-fn :prpr3.stream.cross/key-comparator-fn
key-extractor-fns :prpr3.stream.cross/key-extractor-fns
:as cross-spec}
stream-id
stream)
given a stream of chunks of partitions, and a partition-buffer of [key partition] tuples, retrieve another chunk of partitions and add them to the partition-buffer (or add the keyword ::drained if the end of the stream is reached, or ::errored if the stream errored)
returns Promise<[ [<partition-key> <partition>]* ::drained?]
given a stream of chunks of partitions, and a partition-buffer of [key partition] tuples, retrieve another chunk of partitions and add them to the partition-buffer (or add the keyword ::drained if the end of the stream is reached, or ::errored if the stream errored) returns Promise<[ [<partition-key> <partition>]* ::drained?]
(chunk-full? chunk-builder
{target-chunk-size :prpr3.stream.cross/target-chunk-size
:as _cross-spec})
should the current chunk be wrapped?
should the current chunk be wrapped?
(configure-cross-op cross-spec)
assemble helper functions to allow the core cross-stream* impl to perform the specified operation
assemble helper functions to allow the core cross-stream* impl to perform the specified operation
(cross cross-spec id-streams)
cross some sorted streams, returning a stream according to the cross-spec
each input stream must be sorted ascending in the key specified in cross-spec at [::stream.cross/keys <stream-id>] with the comparator fn from ::stream.cross/comparator
e.g. this invocations inner-joins a stream of users, sorted by :org-id, to a stream of orgs, sorted by :id
(cross {::stream.cross/keys {:users :org-id :orgs :id} ::stream.cross/op ::stream.cross/inner-join} {:users <users-stream> :orgs <orgs-stream>})
cross some sorted streams, returning a stream according to the cross-spec each input stream must be sorted ascending in the key specified in cross-spec at [::stream.cross/keys <stream-id>] with the comparator fn from ::stream.cross/comparator - cross-spec : a description of the operation to cross the streams - id-streams : {<stream-id> <stream>} e.g. this invocations inner-joins a stream of users, sorted by :org-id, to a stream of orgs, sorted by :id (cross {::stream.cross/keys {:users :org-id :orgs :id} ::stream.cross/op ::stream.cross/inner-join} {:users <users-stream> :orgs <orgs-stream>})
(cross* cross-spec id-streams)
the implementation, which relies on the support functions:
compare
and proceeds iteratively like so:
the implementation, which relies on the support functions: - select-fn - select from partitions with matching keys - merge-fn - merge records from multiple streams with matching keys, - product-sort-fn - sort a merged cartesian product of records with matching keys from multiple streams - key-comparator-fn - compare keys, like `compare` - key-extractor-fns - extract a key from a value on a stream and proceeds iteratively like so: - fill any partition buffers requiring it - find the minimum key-value from all the lead partitions - use the select-fn to select from the lead partitions with the minimum-key-value: [[<stream-id> <partition>]+], taking only the selected partitions from their respective buffers - generate a cartesian product from the selected [[<stream-id> <partition>]+] partitions - merge the records from each row of the cartesian product - i.e. one record from each stream - sort the resulting list of merged records with the product-sort-fn - add the sorted list of merged records to the current chunk - output the chunk if it's full
the fns which implement cross operation behaviours, all derived from the config in CrossSpec and defaults
the fns which implement cross operation behaviours, all derived from the config in CrossSpec and defaults
(fill-partition-buffers! id-partition-buffers cross-spec id-streams)
buffer another chunk from any streams which are down to a single partition and have not yet been stream-finished-drained-marker
buffer another chunk from any streams which are down to a single partition and have not yet been stream-finished-drained-marker
(first-cross-input-error id-partition-buffers)
use the first input error for an output error
use the first input error for an output error
(generate-output {merge-fn :prpr3.stream.cross/merge-fn
product-sort-fn :prpr3.stream.cross/product-sort-fn
finalizer-fn :prpr3.stream.cross/finalizer-fn
:as _cross-spec}
selected-id-partitions)
given partition-selections, cartesion-product the selected partitions, merging each row into a {<stream-id> <val>} map, and applying the merge-fn and any finalizer
given partition-selections, cartesion-product the selected partitions, merging each row into a {<stream-id> <val>} map, and applying the merge-fn and any finalizer
id->stream mappings, either in a map, or a list of pairs - the latter providing order for operations like n-left-join which require it
id->stream mappings, either in a map, or a list of pairs - the latter providing order for operations like n-left-join which require it
(init-partition-buffers! cross-spec id-streams)
returns partition buffers for each stream with partitions from the first chunk
returns partition buffers for each stream with partitions from the first chunk
(make-merge-difference {kxfns :prpr3.stream.cross/key-extractor-fns
:as _cross-spec})
(make-merge-inner-join {kxfns :prpr3.stream.cross/key-extractor-fns
:as _cross-spec})
(make-merge-intersect {kxfns :prpr3.stream.cross/key-extractor-fns
:as _cross-spec})
(make-merge-n-left-join {kxfns :prpr3.stream.cross/key-extractor-fns
n :prpr3.stream.cross.op.n-left-join/n
:as _cross-spec})
(min-key-val key-comparator-fn ks)
uses the comparator to find the minimum key value from ks
uses the comparator to find the minimum key value from ks
(next-selections {select-fn :prpr3.stream.cross/select-fn
key-comparator-fn :prpr3.stream.cross/key-comparator-fn
:as _cross-spec}
id-partition-buffers)
select partitions for the operation return [[[<stream-id> <partition>]+] updated-id-partition-buffers]
select partitions for the operation return [[[<stream-id> <partition>]+] updated-id-partition-buffers]
(partition-buffer-content-drained? partition-buffer)
returns true when a partition-buffer has no more content and the associated stream is finished (drained or errored)
returns true when a partition-buffer has no more content and the associated stream is finished (drained or errored)
(partition-buffer-errored? partition-buffer)
returns true when a partition-buffer has no more content and the associated stream errored
returns true when a partition-buffer has no more content and the associated stream errored
(partition-buffer-needs-filling? stream-id partition-buffer)
don't wait until empty
don't wait until empty
(partition-stream {target-chunk-size :prpr3.stream.cross/target-chunk-size
kxfns :prpr3.stream.cross/key-extractor-fns
:as _cross-spec}
stream-id
stream)
(partition-streams {kxfns :prpr3.stream.cross/key-extractor-fns :as cross-spec}
id-streams)
returns a linked/map with {<stream-id> <partitioned-stream>}, and in the same order as specifed in the ::stream.cross/keys config
returns a linked/map with {<stream-id> <partitioned-stream>}, and in the same order as specifed in the ::stream.cross/keys config
(select-all id-partitions)
select-fn which takes all offered id-partitions
select-fn which takes all offered id-partitions
(select-first id-partitions)
select-fn which takes the first id-partition from the offered list of id-partitions
select-fn which takes the first id-partition from the offered list of id-partitions
(set-select-all id-partitions)
select-fn which takes all offered id-partitions and additionlly checks that no partition has more than a single element (as required of a set)
select-fn which takes all offered id-partitions and additionlly checks that no partition has more than a single element (as required of a set)
the partition buffer marker values indicating that a stream is finished
the partition buffer marker values indicating that a stream is finished
(stream-finished? partition-buffer)
takes a partition-buffer and returns true if there are no more values to consumer from the corresponding stream
takes a partition-buffer and returns true if there are no more values to consumer from the corresponding stream
(values-sorted? comparator-fn vs)
returns true if vs are sorted according to comparator-fn
returns true if vs are sorted according to comparator-fn
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close