Protocol-based parallel reduction architecture and helper functions.
Protocol-based parallel reduction architecture and helper functions.
(->consumer cfn)
Return an instance of a consumer, double consumer, or long consumer.
Return an instance of a consumer, double consumer, or long consumer.
(bind-double-consumer-reducer! ctor)
(bind-double-consumer-reducer! cls-type ctor)
Bind a classtype as a double consumer parallel reducer - the consumer must implement DoubleConsumer, ham_fisted.Reducible, and IDeref.
Bind a classtype as a double consumer parallel reducer - the consumer must implement DoubleConsumer, ham_fisted.Reducible, and IDeref.
(compose-reducers reducers)
(compose-reducers options reducers)
Given a map or sequence of reducers return a new reducer that produces a map or vector of results.
If data is a sequence then context is guaranteed to be an object array.
Options:
:rfn-datatype
- One of nil, :int64, or :float64. This indicates that the rfn's
should all be uniform as accepting longs, doubles, or generically objects. Defaults
to nil.Given a map or sequence of reducers return a new reducer that produces a map or vector of results. If data is a sequence then context is guaranteed to be an object array. Options: * `:rfn-datatype` - One of nil, :int64, or :float64. This indicates that the rfn's should all be uniform as accepting longs, doubles, or generically objects. Defaults to nil.
(consume! consumer coll)
Consumer a collection. This is simply a reduction where the return value is ignored.
Returns the consumer.
Consumer a collection. This is simply a reduction where the return value is ignored. Returns the consumer.
Generic reduction function using a consumer
Generic reduction function using a consumer
(consumer-preducer constructor)
Bind a consumer as a parallel reducer.
Consumer must implement java.util.function.Consumer, ham_fisted.Reducible and clojure.lang.IDeref.
Returns instance of type bound.
See documentation for [[declare-double-consumer-preducer!]].
Bind a consumer as a parallel reducer. Consumer must implement java.util.function.Consumer, ham_fisted.Reducible and clojure.lang.IDeref. Returns instance of type bound. See documentation for [[declare-double-consumer-preducer!]]. ```
(consumer-reducer ctor)
Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref
Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref
(double-accumulator accvar varvar & code)
Type-hinted double reduction accumulator. consumer:
ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
0.0
(range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
Type-hinted double reduction accumulator. consumer: ```clojure ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v)) 0.0 (range 1000)) #<SimpleSum@2fbcf20: 499500.0> ham-fisted.api> @*1 499500.0 ```
Converts from a double consumer to a double reduction accumulator that returns the consumer:
ham-fisted.api> (reduce double-consumer-accumulator
(Sum$SimpleSum.)
(range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
Converts from a double consumer to a double reduction accumulator that returns the consumer: ```clojure ham-fisted.api> (reduce double-consumer-accumulator (Sum$SimpleSum.) (range 1000)) #<SimpleSum@2fbcf20: 499500.0> ham-fisted.api> @*1 499500.0 ```
(double-consumer-preducer constructor)
Return a preducer for a double consumer.
Consumer must implement java.util.function.DoubleConsumer, ham_fisted.Reducible and clojure.lang.IDeref.
user> (require '[ham-fisted.api :as hamf])
nil
user> (import '[java.util.function DoubleConsumer])
java.util.function.DoubleConsumer
user> (import [ham_fisted Reducible])
ham_fisted.Reducible
user> (import '[clojure.lang IDeref])
clojure.lang.IDeref
user> (deftype MeanR [^{:unsynchronized-mutable true :tag 'double} sum
^{:unsynchronized-mutable true :tag 'long} n-elems]
DoubleConsumer
(accept [this v] (set! sum (+ sum v)) (set! n-elems (unchecked-inc n-elems)))
Reducible
(reduce [this o]
(set! sum (+ sum (.-sum ^MeanR o)))
(set! n-elems (+ n-elems (.-n-elems ^MeanR o)))
this)
IDeref (deref [this] (/ sum n-elems)))
user.MeanR
user> (hamf/declare-double-consumer-preducer! MeanR (MeanR. 0 0))
nil
user> (hamf/preduce-reducer (double-consumer-preducer #(MeanR. 0 0)) (hamf/range 200000))
99999.5
Return a preducer for a double consumer. Consumer must implement java.util.function.DoubleConsumer, ham_fisted.Reducible and clojure.lang.IDeref. ```clojure user> (require '[ham-fisted.api :as hamf]) nil user> (import '[java.util.function DoubleConsumer]) java.util.function.DoubleConsumer user> (import [ham_fisted Reducible]) ham_fisted.Reducible user> (import '[clojure.lang IDeref]) clojure.lang.IDeref user> (deftype MeanR [^{:unsynchronized-mutable true :tag 'double} sum ^{:unsynchronized-mutable true :tag 'long} n-elems] DoubleConsumer (accept [this v] (set! sum (+ sum v)) (set! n-elems (unchecked-inc n-elems))) Reducible (reduce [this o] (set! sum (+ sum (.-sum ^MeanR o))) (set! n-elems (+ n-elems (.-n-elems ^MeanR o))) this) IDeref (deref [this] (/ sum n-elems))) user.MeanR user> (hamf/declare-double-consumer-preducer! MeanR (MeanR. 0 0)) nil user> (hamf/preduce-reducer (double-consumer-preducer #(MeanR. 0 0)) (hamf/range 200000)) 99999.5 ```
(double-consumer-reducer ctor)
Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref
Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref
(immut-map-kv ks vs)
(immut-map-kv keyfn valfn data)
(indexed-accum accvar idxvar varvar & code)
Create an indexed accumulator that recieves and additional long index during a reduction:
ham-fisted.api> (reduce (indexed-accum
acc idx v (conj acc [idx v]))
[]
(range 5))
[[0 0] [1 1] [2 2] [3 3] [4 4]]
Create an indexed accumulator that recieves and additional long index during a reduction: ```clojure ham-fisted.api> (reduce (indexed-accum acc idx v (conj acc [idx v])) [] (range 5)) [[0 0] [1 1] [2 2] [3 3] [4 4]] ```
(indexed-double-accum accvar idxvar varvar & code)
Create an indexed double accumulator that recieves and additional long index during a reduction:
ham-fisted.api> (reduce (indexed-double-accum
acc idx v (conj acc [idx v]))
[]
(range 5))
[[0 0.0] [1 1.0] [2 2.0] [3 3.0] [4 4.0]]
Create an indexed double accumulator that recieves and additional long index during a reduction: ```clojure ham-fisted.api> (reduce (indexed-double-accum acc idx v (conj acc [idx v])) [] (range 5)) [[0 0.0] [1 1.0] [2 2.0] [3 3.0] [4 4.0]] ```
(indexed-long-accum accvar idxvar varvar & code)
Create an indexed long accumulator that recieves and additional long index during a reduction:
ham-fisted.api> (reduce (indexed-long-accum
acc idx v (conj acc [idx v]))
[]
(range 5))
[[0 0] [1 1] [2 2] [3 3] [4 4]]
Create an indexed long accumulator that recieves and additional long index during a reduction: ```clojure ham-fisted.api> (reduce (indexed-long-accum acc idx v (conj acc [idx v])) [] (range 5)) [[0 0] [1 1] [2 2] [3 3] [4 4]] ```
(long-accumulator accvar varvar & code)
Type-hinted double reduction accumulator. consumer:
ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
0.0
(range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
Type-hinted double reduction accumulator. consumer: ```clojure ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v)) 0.0 (range 1000)) #<SimpleSum@2fbcf20: 499500.0> ham-fisted.api> @*1 499500.0 ```
Converts from a long consumer to a long reduction accumulator that returns the consumer:
ham-fisted.api> (reduce double-consumer-accumulator
(Sum$SimpleSum.)
(range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
Converts from a long consumer to a long reduction accumulator that returns the consumer: ```clojure ham-fisted.api> (reduce double-consumer-accumulator (Sum$SimpleSum.) (range 1000)) #<SimpleSum@2fbcf20: 499500.0> ham-fisted.api> @*1 499500.0 ```
(long-consumer-reducer ctor)
Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref
Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref
(options->parallel-options options)
Convert an options map to a parallel options object.
Options:
:pool
- supply the forkjoinpool to use.:max-batch-size
- Defaults to 64000, used for index-based parallel pathways to control
the number size of each parallelized batch.:ordered?
- When true process inputs and provide results in order.:parallelism
- The amount of parallelism to expect. Defaults to the number of threads
in the fork-join pool provided.:cat-parallelism
- Either :seq-wise
or :elem-wise
- when parallelizing over a
concatenation of containers either parallelize each container meaning call preduce on each
container using many threads per container or use one thread per container - seq-wise
. Defaults
to seq-wise
as this doesn't require each container itself to support parallelization but relies on
the sequence of containers to be long enough to saturate the processor. Can also be set at time of
container construction - see lazy-noncaching/concat-opts
.:put-timeout-ms
- The time to wait to put data into the queue. This is a safety mechanism
so that if the processing system fails we don't just keep putting things into a queue.:unmerged-result?
- Use with care. For parallel reductions do not perform the merge step
but instead return the sequence of partially reduced results.:n-lookahead
- How for to look ahead for pmap and upmap to add new jobs to the queue. Defaults
to `(* 2 parallelism).Convert an options map to a parallel options object. Options: * `:pool` - supply the forkjoinpool to use. * `:max-batch-size` - Defaults to 64000, used for index-based parallel pathways to control the number size of each parallelized batch. * `:ordered?` - When true process inputs and provide results in order. * `:parallelism` - The amount of parallelism to expect. Defaults to the number of threads in the fork-join pool provided. * `:cat-parallelism` - Either `:seq-wise` or `:elem-wise` - when parallelizing over a concatenation of containers either parallelize each container meaning call preduce on each container using many threads per container or use one thread per container - `seq-wise`. Defaults to `seq-wise` as this doesn't require each container itself to support parallelization but relies on the sequence of containers to be long enough to saturate the processor. Can also be set at time of container construction - see [[lazy-noncaching/concat-opts]]. * `:put-timeout-ms` - The time to wait to put data into the queue. This is a safety mechanism so that if the processing system fails we don't just keep putting things into a queue. * `:unmerged-result?` - Use with care. For parallel reductions do not perform the merge step but instead return the sequence of partially reduced results. * `:n-lookahead` - How for to look ahead for pmap and upmap to add new jobs to the queue. Defaults to `(* 2 parallelism).
(parallel-reducer init-fn rfn merge-fn)
(parallel-reducer init-fn rfn merge-fn fin-fn)
Implement a parallel reducer by explicitly passing in the various required functions.
user> (hamf-rf/preduce-reducer
(hamf-rf/parallel-reducer
hamf/mut-set
#(do (.add ^java.util.Set %1 %2) %1)
hamf/union
hamf/sort)
(lznc/map (fn ^long [^long v] (rem v 13)) (hamf/range 1000000)))
[0 1 2 3 4 5 6 7 8 9 10 11 12]
Implement a parallel reducer by explicitly passing in the various required functions. * 'init-fn' - Takes no argumenst and returns a new accumulation target. * 'rfn' - clojure rf function - takes two arguments, the accumulation target and a new value and produces a new accumulation target. * 'merge-fn' - Given two accumulation targets returns a new combined accumulation target. * 'fin-fn' - optional - Given an accumulation target returns the desired final type. ```clojure user> (hamf-rf/preduce-reducer (hamf-rf/parallel-reducer hamf/mut-set #(do (.add ^java.util.Set %1 %2) %1) hamf/union hamf/sort) (lznc/map (fn ^long [^long v] (rem v 13)) (hamf/range 1000000))) [0 1 2 3 4 5 6 7 8 9 10 11 12] ```
(preduce init-val-fn rfn merge-fn coll)
(preduce init-val-fn rfn merge-fn options coll)
Parallelized reduction. Currently coll must either be random access or a lznc map/filter chain based on one or more random access entities, hashmaps and sets from this library or any java.util set, hashmap or concurrent versions of these. If input cannot be parallelized this lowers to a normal serial reduction.
For potentially small-n invocations providing the parallel options explicitly will improve performance surprisingly - converting the options map to the parallel options object takes a bit of time.
init-val-fn
- Potentially called in reduction threads to produce each initial value.rfn
- normal clojure reduction function. Typehinting the second argument to double
or long will sometimes produce a faster reduction.merge-fn
- Merge two reduction results into one.Options:
:pool
- The fork-join pool to use. Defaults to common pool which assumes reduction is
cpu-bound.:parallelism
- What parallelism to use - defaults to pool's getParallelism
method.:max-batch-size
- Rough maximum batch size for indexed or grouped reductions. This
can both even out batch times and ensure you don't get into safepoint trouble with
jdk-8.:min-n
- minimum number of elements before initiating a parallelized reduction -
Defaults to 1000 but you should customize this particular to your specific reduction.:ordered?
- True if results should be in order. Unordered results sometimes are
slightly faster but again you should test for your specific situation..:cat-parallelism
- Either :seq-wise
or :elem-wise
, defaults to :seq-wise
.
Test for your specific situation, this really is data-dependent. This contols how a
concat primitive parallelizes the reduction across its contains. Elemwise means each
container's reduction is individually parallelized while seqwise indicates to do a
pmap style initial reduction across containers then merge the results.:put-timeout-ms
- Number of milliseconds to wait for queue space before throwing
an exception in unordered reductions. Defaults to 50000.:unmerged-result?
- Defaults to false. When true, the sequence of results
be returned directly without any merge steps in a lazy-noncaching container. Beware
the noncaching aspect -- repeatedly evaluating this result may kick off the parallelized
reduction multiple times. To ensure caching if unsure call seq
on the result.Parallelized reduction. Currently coll must either be random access or a lznc map/filter chain based on one or more random access entities, hashmaps and sets from this library or any java.util set, hashmap or concurrent versions of these. If input cannot be parallelized this lowers to a normal serial reduction. For potentially small-n invocations providing the parallel options explicitly will improve performance surprisingly - converting the options map to the parallel options object takes a bit of time. * `init-val-fn` - Potentially called in reduction threads to produce each initial value. * `rfn` - normal clojure reduction function. Typehinting the second argument to double or long will sometimes produce a faster reduction. * `merge-fn` - Merge two reduction results into one. Options: * `:pool` - The fork-join pool to use. Defaults to common pool which assumes reduction is cpu-bound. * `:parallelism` - What parallelism to use - defaults to pool's `getParallelism` method. * `:max-batch-size` - Rough maximum batch size for indexed or grouped reductions. This can both even out batch times and ensure you don't get into safepoint trouble with jdk-8. * `:min-n` - minimum number of elements before initiating a parallelized reduction - Defaults to 1000 but you should customize this particular to your specific reduction. * `:ordered?` - True if results should be in order. Unordered results sometimes are slightly faster but again you should test for your specific situation.. * `:cat-parallelism` - Either `:seq-wise` or `:elem-wise`, defaults to `:seq-wise`. Test for your specific situation, this really is data-dependent. This contols how a concat primitive parallelizes the reduction across its contains. Elemwise means each container's reduction is individually parallelized while seqwise indicates to do a pmap style initial reduction across containers then merge the results. * `:put-timeout-ms` - Number of milliseconds to wait for queue space before throwing an exception in unordered reductions. Defaults to 50000. * `:unmerged-result?` - Defaults to false. When true, the sequence of results be returned directly without any merge steps in a lazy-noncaching container. Beware the noncaching aspect -- repeatedly evaluating this result may kick off the parallelized reduction multiple times. To ensure caching if unsure call `seq` on the result.
(preduce-reducer reducer coll)
(preduce-reducer reducer options coll)
Given an instance of ham-fisted.protocols/ParallelReducer
, perform a parallel
reduction.
In the case where the result is requested unmerged then finalize will be called on each result in a lazy noncaching way. In this case you can use a non-parallelized reducer and simply get a sequence of results as opposed to one.
See options for ham-fisted.reduce/preduce
.
Additional Options:
:skip-finalize?
- when true, the reducer's finalize method is not called on the result.Given an instance of [[ham-fisted.protocols/ParallelReducer]], perform a parallel reduction. In the case where the result is requested unmerged then finalize will be called on each result in a lazy noncaching way. In this case you can use a non-parallelized reducer and simply get a sequence of results as opposed to one. * reducer - instance of ParallelReducer * options - Same options as preduce. * coll - something potentially with a parallelizable reduction. See options for [[ham-fisted.reduce/preduce]]. Additional Options: * `:skip-finalize?` - when true, the reducer's finalize method is not called on the result.
(preduce-reducers reducers coll)
(preduce-reducers reducers options coll)
Given a map or sequence of ham-fisted.protocols/ParallelReducer
, produce a map or
sequence of reduced values. Reduces over input coll once in parallel if coll is large
enough. See options for ham-fisted.reduce/preduce
.
ham-fisted.api> (preduce-reducers {:sum (Sum.) :mult *} (range 20))
{:mult 0, :sum #<Sum@5082c3b7: {:sum 190.0, :n-elems 20}>}
Given a map or sequence of [[ham-fisted.protocols/ParallelReducer]], produce a map or sequence of reduced values. Reduces over input coll once in parallel if coll is large enough. See options for [[ham-fisted.reduce/preduce]]. ```clojure ham-fisted.api> (preduce-reducers {:sum (Sum.) :mult *} (range 20)) {:mult 0, :sum #<Sum@5082c3b7: {:sum 190.0, :n-elems 20}>} ```
(reduce-reducer reducer coll)
Serially reduce a reducer.
ham-fisted.api> (reduce-reducer (Sum.) (range 1000))
#<Sum@afbedb: {:sum 499500.0, :n-elems 1000}>
Serially reduce a reducer. ```clojure ham-fisted.api> (reduce-reducer (Sum.) (range 1000)) #<Sum@afbedb: {:sum 499500.0, :n-elems 1000}> ```
(reduce-reducers reducers coll)
Serially reduce a map or sequence of reducers into a map or sequence of results.
ham-fisted.api> (reduce-reducers {:a (Sum.) :b *} (range 1 21))
{:b 2432902008176640000, :a #<Sum@6bcebeb1: {:sum 210.0, :n-elems 20}>}
Serially reduce a map or sequence of reducers into a map or sequence of results. ```clojure ham-fisted.api> (reduce-reducers {:a (Sum.) :b *} (range 1 21)) {:b 2432902008176640000, :a #<Sum@6bcebeb1: {:sum 210.0, :n-elems 20}>} ```
(reducer->completef reducer)
Return fold-compatible pair of [reducef, completef] given a parallel reducer. Note that folded reducers are not finalized as of this time:
ham-fisted.api> (def data (vec (range 200000)))
#'ham-fisted.api/data
ham-fisted.api> (r/fold (reducer->completef (Sum.)) (reducer->rfn (Sum.)) data)
#<Sum@858c206: {:sum 1.99999E10, :n-elems 200000}>
Return fold-compatible pair of [reducef, completef] given a parallel reducer. Note that folded reducers are not finalized as of this time: ```clojure ham-fisted.api> (def data (vec (range 200000))) #'ham-fisted.api/data ham-fisted.api> (r/fold (reducer->completef (Sum.)) (reducer->rfn (Sum.)) data) #<Sum@858c206: {:sum 1.99999E10, :n-elems 200000}> ```
(reducer->rf reducer)
Given a reducer, return a transduce-compatible rf -
ham-fisted.api> (transduce (clojure.core/map #(+ % 2)) (reducer->rf (Sum.)) (range 200))
{:sum 20300.0, :n-elems 200}
Given a reducer, return a transduce-compatible rf - ```clojure ham-fisted.api> (transduce (clojure.core/map #(+ % 2)) (reducer->rf (Sum.)) (range 200)) {:sum 20300.0, :n-elems 200} ```
(reducer-with-finalize reducer fin-fn)
(reducer-xform->reducer reducer xform)
Given a reducer and a transducer xform produce a new reducer which will apply the transducer pipeline before is reduction function.
ham-fisted.api> (reduce-reducer (reducer-xform->reducer (Sum.) (clojure.core/filter even?))
(range 1000))
#<Sum@479456: {:sum 249500.0, :n-elems 500}>
!! - If you use a stateful transducer here then you must not use the reducer in a parallelized reduction.
Given a reducer and a transducer xform produce a new reducer which will apply the transducer pipeline before is reduction function. ```clojure ham-fisted.api> (reduce-reducer (reducer-xform->reducer (Sum.) (clojure.core/filter even?)) (range 1000)) #<Sum@479456: {:sum 249500.0, :n-elems 500}> ``` !! - If you use a stateful transducer here then you must *not* use the reducer in a parallelized reduction.
Parallel reduction merge function that expects both sides to be an instances of Reducible
Parallel reduction merge function that expects both sides to be an instances of Reducible
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close