Liking cljdoc? Tell your friends :D

ham-fisted.reduce

Protocol-based parallel reduction architecture and helper functions.

Protocol-based parallel reduction architecture and helper functions.
raw docstring

->consumerclj

(->consumer cfn)

Return an instance of a consumer, double consumer, or long consumer.

Return an instance of a consumer, double consumer, or long consumer.
raw docstring

bind-double-consumer-reducer!clj

(bind-double-consumer-reducer! ctor)
(bind-double-consumer-reducer! cls-type ctor)

Bind a classtype as a double consumer parallel reducer - the consumer must implement DoubleConsumer, ham_fisted.Reducible, and IDeref.

Bind a classtype as a double consumer parallel reducer - the consumer must implement
DoubleConsumer, ham_fisted.Reducible, and IDeref.
raw docstring

compose-reducersclj

(compose-reducers reducers)
(compose-reducers options reducers)

Given a map or sequence of reducers return a new reducer that produces a map or vector of results.

If data is a sequence then context is guaranteed to be an object array.

Options:

  • :rfn-datatype - One of nil, :int64, or :float64. This indicates that the rfn's should all be uniform as accepting longs, doubles, or generically objects. Defaults to nil.
Given a map or sequence of reducers return a new reducer that produces a map or
vector of results.

If data is a sequence then context is guaranteed to be an object array.

Options:

* `:rfn-datatype` - One of nil, :int64, or :float64.  This indicates that the rfn's
should all be uniform as accepting longs, doubles, or generically objects.  Defaults
to nil.
raw docstring

consume!clj

(consume! consumer coll)

Consumer a collection. This is simply a reduction where the return value is ignored.

Returns the consumer.

Consumer a collection.  This is simply a reduction where the return value
is ignored.

Returns the consumer.
raw docstring

consumer-accumulatorclj

Generic reduction function using a consumer

Generic reduction function using a consumer
raw docstring

consumer-preducerclj

(consumer-preducer constructor)

Bind a consumer as a parallel reducer.

Consumer must implement java.util.function.Consumer, ham_fisted.Reducible and clojure.lang.IDeref.

Returns instance of type bound.

See documentation for [[declare-double-consumer-preducer!]].

Bind a consumer as a parallel reducer.

  Consumer must implement java.util.function.Consumer,
  ham_fisted.Reducible and clojure.lang.IDeref.

  Returns instance of type bound.

  See documentation for [[declare-double-consumer-preducer!]].
```
raw docstring

consumer-reducerclj

(consumer-reducer ctor)

Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref

Make a parallel double consumer reducer given a function that takes no arguments and is
guaranteed to produce a double consumer which also implements Reducible and IDeref
raw docstring

double-accumulatorcljmacro

(double-accumulator accvar varvar & code)

Type-hinted double reduction accumulator. consumer:

  ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
                             0.0
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
Type-hinted double reduction accumulator.
  consumer:

```clojure
  ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
                             0.0
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
```
raw docstring

double-consumer-accumulatorclj

Converts from a double consumer to a double reduction accumulator that returns the consumer:

ham-fisted.api> (reduce double-consumer-accumulator
                             (Sum$SimpleSum.)
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
Converts from a double consumer to a double reduction accumulator that returns the
  consumer:

```clojure
ham-fisted.api> (reduce double-consumer-accumulator
                             (Sum$SimpleSum.)
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
```
raw docstring

double-consumer-preducerclj

(double-consumer-preducer constructor)

Return a preducer for a double consumer.

Consumer must implement java.util.function.DoubleConsumer, ham_fisted.Reducible and clojure.lang.IDeref.

user> (require '[ham-fisted.api :as hamf])
nil
user> (import '[java.util.function DoubleConsumer])
java.util.function.DoubleConsumer
user> (import [ham_fisted Reducible])
ham_fisted.Reducible
user> (import '[clojure.lang IDeref])
clojure.lang.IDeref
user> (deftype MeanR [^{:unsynchronized-mutable true :tag 'double} sum
                      ^{:unsynchronized-mutable true :tag 'long} n-elems]
        DoubleConsumer
        (accept [this v] (set! sum (+ sum v)) (set! n-elems (unchecked-inc n-elems)))
        Reducible
        (reduce [this o]
          (set! sum (+ sum (.-sum ^MeanR o)))
          (set! n-elems (+ n-elems (.-n-elems ^MeanR o)))
          this)
        IDeref (deref [this] (/ sum n-elems)))
user.MeanR
user> (hamf/declare-double-consumer-preducer! MeanR (MeanR. 0 0))
nil
  user> (hamf/preduce-reducer (double-consumer-preducer #(MeanR. 0 0)) (hamf/range 200000))
99999.5
Return a preducer for a double consumer.

  Consumer must implement java.util.function.DoubleConsumer,
  ham_fisted.Reducible and clojure.lang.IDeref.

```clojure
user> (require '[ham-fisted.api :as hamf])
nil
user> (import '[java.util.function DoubleConsumer])
java.util.function.DoubleConsumer
user> (import [ham_fisted Reducible])
ham_fisted.Reducible
user> (import '[clojure.lang IDeref])
clojure.lang.IDeref
user> (deftype MeanR [^{:unsynchronized-mutable true :tag 'double} sum
                      ^{:unsynchronized-mutable true :tag 'long} n-elems]
        DoubleConsumer
        (accept [this v] (set! sum (+ sum v)) (set! n-elems (unchecked-inc n-elems)))
        Reducible
        (reduce [this o]
          (set! sum (+ sum (.-sum ^MeanR o)))
          (set! n-elems (+ n-elems (.-n-elems ^MeanR o)))
          this)
        IDeref (deref [this] (/ sum n-elems)))
user.MeanR
user> (hamf/declare-double-consumer-preducer! MeanR (MeanR. 0 0))
nil
  user> (hamf/preduce-reducer (double-consumer-preducer #(MeanR. 0 0)) (hamf/range 200000))
99999.5
```
raw docstring

double-consumer-reducerclj

(double-consumer-reducer ctor)

Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref

Make a parallel double consumer reducer given a function that takes no arguments and is
guaranteed to produce a double consumer which also implements Reducible and IDeref
raw docstring

immut-map-kvclj

(immut-map-kv ks vs)
(immut-map-kv keyfn valfn data)

indexed-accumcljmacro

(indexed-accum accvar idxvar varvar & code)

Create an indexed accumulator that recieves and additional long index during a reduction:

ham-fisted.api> (reduce (indexed-accum
                         acc idx v (conj acc [idx v]))
                        []
                        (range 5))
[[0 0] [1 1] [2 2] [3 3] [4 4]]
Create an indexed accumulator that recieves and additional long index
  during a reduction:

```clojure
ham-fisted.api> (reduce (indexed-accum
                         acc idx v (conj acc [idx v]))
                        []
                        (range 5))
[[0 0] [1 1] [2 2] [3 3] [4 4]]
```
raw docstring

indexed-double-accumcljmacro

(indexed-double-accum accvar idxvar varvar & code)

Create an indexed double accumulator that recieves and additional long index during a reduction:

ham-fisted.api> (reduce (indexed-double-accum
                         acc idx v (conj acc [idx v]))
                        []
                        (range 5))
[[0 0.0] [1 1.0] [2 2.0] [3 3.0] [4 4.0]]
Create an indexed double accumulator that recieves and additional long index
  during a reduction:

```clojure
ham-fisted.api> (reduce (indexed-double-accum
                         acc idx v (conj acc [idx v]))
                        []
                        (range 5))
[[0 0.0] [1 1.0] [2 2.0] [3 3.0] [4 4.0]]
```
raw docstring

indexed-long-accumcljmacro

(indexed-long-accum accvar idxvar varvar & code)

Create an indexed long accumulator that recieves and additional long index during a reduction:

ham-fisted.api> (reduce (indexed-long-accum
                         acc idx v (conj acc [idx v]))
                        []
                        (range 5))
[[0 0] [1 1] [2 2] [3 3] [4 4]]
Create an indexed long accumulator that recieves and additional long index
  during a reduction:

```clojure
ham-fisted.api> (reduce (indexed-long-accum
                         acc idx v (conj acc [idx v]))
                        []
                        (range 5))
[[0 0] [1 1] [2 2] [3 3] [4 4]]
```
raw docstring

long-accumulatorcljmacro

(long-accumulator accvar varvar & code)

Type-hinted double reduction accumulator. consumer:

  ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
                             0.0
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
Type-hinted double reduction accumulator.
  consumer:

```clojure
  ham-fisted.api> (reduce (double-accumulator acc v (+ (double acc) v))
                             0.0
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
```
raw docstring

long-consumer-accumulatorclj

Converts from a long consumer to a long reduction accumulator that returns the consumer:

ham-fisted.api> (reduce double-consumer-accumulator
                             (Sum$SimpleSum.)
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
Converts from a long consumer to a long reduction accumulator that returns the
  consumer:

```clojure
ham-fisted.api> (reduce double-consumer-accumulator
                             (Sum$SimpleSum.)
                             (range 1000))
#<SimpleSum@2fbcf20: 499500.0>
ham-fisted.api> @*1
499500.0
```
raw docstring

long-consumer-reducerclj

(long-consumer-reducer ctor)

Make a parallel double consumer reducer given a function that takes no arguments and is guaranteed to produce a double consumer which also implements Reducible and IDeref

Make a parallel double consumer reducer given a function that takes no arguments and is
guaranteed to produce a double consumer which also implements Reducible and IDeref
raw docstring

options->parallel-optionsclj

(options->parallel-options options)

Convert an options map to a parallel options object.

Options:

  • :pool - supply the forkjoinpool to use.
  • :max-batch-size - Defaults to 64000, used for index-based parallel pathways to control the number size of each parallelized batch.
  • :ordered? - When true process inputs and provide results in order.
  • :parallelism - The amount of parallelism to expect. Defaults to the number of threads in the fork-join pool provided.
  • :cat-parallelism - Either :seq-wise or :elem-wise - when parallelizing over a concatenation of containers either parallelize each container meaning call preduce on each container using many threads per container or use one thread per container - seq-wise. Defaults to seq-wise as this doesn't require each container itself to support parallelization but relies on the sequence of containers to be long enough to saturate the processor. Can also be set at time of container construction - see lazy-noncaching/concat-opts.
  • :put-timeout-ms - The time to wait to put data into the queue. This is a safety mechanism so that if the processing system fails we don't just keep putting things into a queue.
  • :unmerged-result? - Use with care. For parallel reductions do not perform the merge step but instead return the sequence of partially reduced results.
  • :n-lookahead - How for to look ahead for pmap and upmap to add new jobs to the queue. Defaults to `(* 2 parallelism).
Convert an options map to a parallel options object.

Options:

* `:pool` - supply the forkjoinpool to use.
* `:max-batch-size` - Defaults to 64000, used for index-based  parallel pathways to control
   the number size of each parallelized batch.
* `:ordered?` - When true process inputs and provide results in order.
* `:parallelism` - The amount of parallelism to expect.  Defaults to the number of threads
   in the fork-join pool provided.
* `:cat-parallelism` - Either `:seq-wise` or `:elem-wise` - when parallelizing over a
   concatenation of containers either parallelize each container meaning call preduce on each
   container using many threads per container or use one thread per container - `seq-wise`.  Defaults 
   to `seq-wise` as this doesn't require each container itself to support parallelization but relies on 
   the sequence of containers to be long enough to saturate the processor.  Can also be set at time of
   container construction - see [[lazy-noncaching/concat-opts]].
* `:put-timeout-ms` - The time to wait to put data into the queue.  This is a safety mechanism
  so that if the processing system fails we don't just keep putting things into a queue.
* `:unmerged-result?` - Use with care.  For parallel reductions do not perform the merge step
  but instead return the sequence of partially reduced results.
* `:n-lookahead` - How for to look ahead for pmap and upmap to add new jobs to the queue.  Defaults
  to `(* 2 parallelism).
raw docstring

parallel-reducerclj

(parallel-reducer init-fn rfn merge-fn)
(parallel-reducer init-fn rfn merge-fn fin-fn)

Implement a parallel reducer by explicitly passing in the various required functions.

  • 'init-fn' - Takes no argumenst and returns a new accumulation target.
  • 'rfn' - clojure rf function - takes two arguments, the accumulation target and a new value and produces a new accumulation target.
  • 'merge-fn' - Given two accumulation targets returns a new combined accumulation target.
  • 'fin-fn' - optional - Given an accumulation target returns the desired final type.
user> (hamf-rf/preduce-reducer
       (hamf-rf/parallel-reducer
        hamf/mut-set
        #(do (.add ^java.util.Set %1 %2) %1)
        hamf/union
        hamf/sort)
       (lznc/map (fn ^long [^long v] (rem v 13)) (hamf/range 1000000)))
[0 1 2 3 4 5 6 7 8 9 10 11 12]
Implement a parallel reducer by explicitly passing in the various required functions.

  * 'init-fn' - Takes no argumenst and returns a new accumulation target.
  * 'rfn' - clojure rf function - takes two arguments, the accumulation target and a new value
     and produces a new accumulation target.
  * 'merge-fn' - Given two accumulation targets returns a new combined accumulation target.
  * 'fin-fn' - optional - Given an accumulation target returns the desired final type.

```clojure
user> (hamf-rf/preduce-reducer
       (hamf-rf/parallel-reducer
        hamf/mut-set
        #(do (.add ^java.util.Set %1 %2) %1)
        hamf/union
        hamf/sort)
       (lznc/map (fn ^long [^long v] (rem v 13)) (hamf/range 1000000)))
[0 1 2 3 4 5 6 7 8 9 10 11 12]
```
raw docstring

preduceclj

(preduce init-val-fn rfn merge-fn coll)
(preduce init-val-fn rfn merge-fn options coll)

Parallelized reduction. Currently coll must either be random access or a lznc map/filter chain based on one or more random access entities, hashmaps and sets from this library or any java.util set, hashmap or concurrent versions of these. If input cannot be parallelized this lowers to a normal serial reduction.

For potentially small-n invocations providing the parallel options explicitly will improve performance surprisingly - converting the options map to the parallel options object takes a bit of time.

  • init-val-fn - Potentially called in reduction threads to produce each initial value.
  • rfn - normal clojure reduction function. Typehinting the second argument to double or long will sometimes produce a faster reduction.
  • merge-fn - Merge two reduction results into one.

Options:

  • :pool - The fork-join pool to use. Defaults to common pool which assumes reduction is cpu-bound.
  • :parallelism - What parallelism to use - defaults to pool's getParallelism method.
  • :max-batch-size - Rough maximum batch size for indexed or grouped reductions. This can both even out batch times and ensure you don't get into safepoint trouble with jdk-8.
  • :min-n - minimum number of elements before initiating a parallelized reduction - Defaults to 1000 but you should customize this particular to your specific reduction.
  • :ordered? - True if results should be in order. Unordered results sometimes are slightly faster but again you should test for your specific situation..
  • :cat-parallelism - Either :seq-wise or :elem-wise, defaults to :seq-wise. Test for your specific situation, this really is data-dependent. This contols how a concat primitive parallelizes the reduction across its contains. Elemwise means each container's reduction is individually parallelized while seqwise indicates to do a pmap style initial reduction across containers then merge the results.
  • :put-timeout-ms - Number of milliseconds to wait for queue space before throwing an exception in unordered reductions. Defaults to 50000.
  • :unmerged-result? - Defaults to false. When true, the sequence of results be returned directly without any merge steps in a lazy-noncaching container. Beware the noncaching aspect -- repeatedly evaluating this result may kick off the parallelized reduction multiple times. To ensure caching if unsure call seq on the result.
Parallelized reduction.  Currently coll must either be random access or a lznc map/filter
chain based on one or more random access entities, hashmaps and sets from this library or
any java.util set, hashmap or concurrent versions of these.  If input cannot be
parallelized this lowers to a normal serial reduction.

For potentially small-n invocations providing the parallel options explicitly will improve
performance surprisingly - converting the options map to the parallel options object
takes a bit of time.

* `init-val-fn` - Potentially called in reduction threads to produce each initial value.
* `rfn` - normal clojure reduction function.  Typehinting the second argument to double
   or long will sometimes produce a faster reduction.
* `merge-fn` - Merge two reduction results into one.

Options:
* `:pool` - The fork-join pool to use.  Defaults to common pool which assumes reduction is
   cpu-bound.
* `:parallelism` - What parallelism to use - defaults to pool's `getParallelism` method.
* `:max-batch-size` - Rough maximum batch size for indexed or grouped reductions.  This
   can both even out batch times and ensure you don't get into safepoint trouble with
   jdk-8.
* `:min-n` - minimum number of elements before initiating a parallelized reduction -
   Defaults to 1000 but you should customize this particular to your specific reduction.
* `:ordered?` - True if results should be in order.  Unordered results sometimes are
  slightly faster but again you should test for your specific situation..
* `:cat-parallelism` - Either `:seq-wise` or `:elem-wise`, defaults to `:seq-wise`.
   Test for your specific situation, this really is data-dependent. This contols how a
   concat primitive parallelizes the reduction across its contains.  Elemwise means each
   container's reduction is individually parallelized while seqwise indicates to do a
   pmap style initial reduction across containers then merge the results.
* `:put-timeout-ms` - Number of milliseconds to wait for queue space before throwing
   an exception in unordered reductions.  Defaults to 50000.
* `:unmerged-result?` - Defaults to false.  When true, the sequence of results
   be returned directly without any merge steps in a lazy-noncaching container.  Beware
   the noncaching aspect -- repeatedly evaluating this result may kick off the parallelized
   reduction multiple times.  To ensure caching if unsure call `seq` on the result.
raw docstring

preduce-reducerclj

(preduce-reducer reducer coll)
(preduce-reducer reducer options coll)

Given an instance of ham-fisted.protocols/ParallelReducer, perform a parallel reduction.

In the case where the result is requested unmerged then finalize will be called on each result in a lazy noncaching way. In this case you can use a non-parallelized reducer and simply get a sequence of results as opposed to one.

  • reducer - instance of ParallelReducer
  • options - Same options as preduce.
  • coll - something potentially with a parallelizable reduction.

See options for ham-fisted.reduce/preduce.

Additional Options:

  • :skip-finalize? - when true, the reducer's finalize method is not called on the result.
Given an instance of [[ham-fisted.protocols/ParallelReducer]], perform a parallel
reduction.

In the case where the result is requested unmerged then finalize will
be called on each result in a lazy noncaching way.  In this case you can use a
non-parallelized reducer and simply get a sequence of results as opposed to one.

* reducer - instance of ParallelReducer
* options - Same options as preduce.
* coll - something potentially with a parallelizable reduction.

See options for [[ham-fisted.reduce/preduce]].

Additional Options:

* `:skip-finalize?` - when true, the reducer's finalize method is not called on the result.
raw docstring

preduce-reducersclj

(preduce-reducers reducers coll)
(preduce-reducers reducers options coll)

Given a map or sequence of ham-fisted.protocols/ParallelReducer, produce a map or sequence of reduced values. Reduces over input coll once in parallel if coll is large enough. See options for ham-fisted.reduce/preduce.

ham-fisted.api> (preduce-reducers {:sum (Sum.) :mult *} (range 20))
{:mult 0, :sum #<Sum@5082c3b7: {:sum 190.0, :n-elems 20}>}
Given a map or sequence of [[ham-fisted.protocols/ParallelReducer]], produce a map or
  sequence of reduced values. Reduces over input coll once in parallel if coll is large
  enough.  See options for [[ham-fisted.reduce/preduce]].

```clojure
ham-fisted.api> (preduce-reducers {:sum (Sum.) :mult *} (range 20))
{:mult 0, :sum #<Sum@5082c3b7: {:sum 190.0, :n-elems 20}>}
```
raw docstring

reduce-reducerclj

(reduce-reducer reducer coll)

Serially reduce a reducer.

ham-fisted.api> (reduce-reducer (Sum.) (range 1000))
#<Sum@afbedb: {:sum 499500.0, :n-elems 1000}>
Serially reduce a reducer.

```clojure
ham-fisted.api> (reduce-reducer (Sum.) (range 1000))
#<Sum@afbedb: {:sum 499500.0, :n-elems 1000}>
```
raw docstring

reduce-reducersclj

(reduce-reducers reducers coll)

Serially reduce a map or sequence of reducers into a map or sequence of results.

ham-fisted.api> (reduce-reducers {:a (Sum.) :b *} (range 1 21))
{:b 2432902008176640000, :a #<Sum@6bcebeb1: {:sum 210.0, :n-elems 20}>}
Serially reduce a map or sequence of reducers into a map or sequence of results.

```clojure
ham-fisted.api> (reduce-reducers {:a (Sum.) :b *} (range 1 21))
{:b 2432902008176640000, :a #<Sum@6bcebeb1: {:sum 210.0, :n-elems 20}>}
```
raw docstring

reducer->completefclj

(reducer->completef reducer)

Return fold-compatible pair of [reducef, completef] given a parallel reducer. Note that folded reducers are not finalized as of this time:

ham-fisted.api> (def data (vec (range 200000)))
#'ham-fisted.api/data
ham-fisted.api> (r/fold (reducer->completef (Sum.)) (reducer->rfn (Sum.)) data)
#<Sum@858c206: {:sum 1.99999E10, :n-elems 200000}>
Return fold-compatible pair of [reducef, completef] given a parallel reducer.
  Note that folded reducers are not finalized as of this time:

```clojure
ham-fisted.api> (def data (vec (range 200000)))
#'ham-fisted.api/data
ham-fisted.api> (r/fold (reducer->completef (Sum.)) (reducer->rfn (Sum.)) data)
#<Sum@858c206: {:sum 1.99999E10, :n-elems 200000}>
```
raw docstring

reducer->rfclj

(reducer->rf reducer)

Given a reducer, return a transduce-compatible rf -

ham-fisted.api> (transduce (clojure.core/map #(+ % 2)) (reducer->rf (Sum.)) (range 200))
{:sum 20300.0, :n-elems 200}
Given a reducer, return a transduce-compatible rf -

```clojure
ham-fisted.api> (transduce (clojure.core/map #(+ % 2)) (reducer->rf (Sum.)) (range 200))
{:sum 20300.0, :n-elems 200}
```
raw docstring

reducer-with-finalizeclj

(reducer-with-finalize reducer fin-fn)

reducer-xform->reducerclj

(reducer-xform->reducer reducer xform)

Given a reducer and a transducer xform produce a new reducer which will apply the transducer pipeline before is reduction function.

ham-fisted.api> (reduce-reducer (reducer-xform->reducer (Sum.) (clojure.core/filter even?))
                                (range 1000))
#<Sum@479456: {:sum 249500.0, :n-elems 500}>

!! - If you use a stateful transducer here then you must not use the reducer in a parallelized reduction.

Given a reducer and a transducer xform produce a new reducer which will apply
  the transducer pipeline before is reduction function.

```clojure
ham-fisted.api> (reduce-reducer (reducer-xform->reducer (Sum.) (clojure.core/filter even?))
                                (range 1000))
#<Sum@479456: {:sum 249500.0, :n-elems 500}>
```
  !! - If you use a stateful transducer here then you must *not* use the reducer in a
  parallelized reduction.
raw docstring

reducible-mergeclj

Parallel reduction merge function that expects both sides to be an instances of Reducible

Parallel reduction merge function that expects both sides to be an instances of
Reducible
raw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close