A library for building up pipelines of transducers using core.async for parallelism.
A library for building up pipelines of transducers using core.async for parallelism.
(chan-seq c)
Simple utility function that will convert a channel to a lazy seq.
Simple utility function that will convert a channel to a lazy seq.
(channel? c)
Test if something is a channel by checking for certain interfaces.
Test if something is a channel by checking for certain interfaces.
(channeled source n)
Converts a pipeline source into a channel. Currently accepts BufferedReaders, channels, functions (which should return a seq) and collections.
Converts a pipeline source into a channel. Currently accepts BufferedReaders, channels, functions (which should return a seq) and collections.
(flow source & xfs)
Takes a source collection and an arbitrary number of transducers, and funnels the source through each transducer.
If the source is a plain sequential collection we defer to transduce and sequential execution.
If the source is a channel this will be parallelized using core.async channels. Returns the last transducer's channel.
For each transducer, it will create a channel and pipe in the contents of the previous channel. If the transducer has metadata indicating that it involves blocking, IO-bound operations it will pipe using core.async's thread and blocking operations. Otherwise it simply defers to async/pipe to connect the two channels.
We chose not to use async/pipeline since it performs extra work for features (ordering,multiple values) that we don't need.
Takes a source collection and an arbitrary number of transducers, and funnels the source through each transducer. If the source is a plain sequential collection we defer to transduce and sequential execution. If the source is a channel this will be parallelized using core.async channels. Returns the last transducer's channel. For each transducer, it will create a channel and pipe in the contents of the previous channel. If the transducer has metadata indicating that it involves blocking, IO-bound operations it will pipe using core.async's thread and blocking operations. Otherwise it simply defers to async/pipe to connect the two channels. We chose not to use async/pipeline since it performs extra work for features (ordering,multiple values) that we don't need.
(kickoff & args)
Given an instance of Pipeline, will dispatch to kickoff-pipeline
.
Otherwise kickoff-flow
Created to allow backwards compatibility to between pipelines and the lighter-weight api.
Given an instance of Pipeline, will dispatch to `kickoff-pipeline`. Otherwise `kickoff-flow` Created to allow backwards compatibility to between pipelines and the lighter-weight api.
(kickoff-flow source xforms & {:keys [limit on-completed collect]})
Given a source, turns it into a channel, and funnels the contents through the xforms provided. Returns a tuple of the created channels and promise which will be delivered when processing is completed.
Takes the following options:
:limit - limit the number of items processed to n :on-completed - a fn to call when processing is complete. this fn should take one arg, which will be the result of the computation. :collect - if truthy, will collect the output of the last channel into the returned promise.
Given a source, turns it into a channel, and funnels the contents through the xforms provided. Returns a tuple of the created channels and promise which will be delivered when processing is completed. Takes the following options: :limit - limit the number of items processed to n :on-completed - a fn to call when processing is complete. this fn should take one arg, which will be the result of the computation. :collect - if truthy, will collect the output of the last channel into the returned promise.
(kickoff-pipeline pipeline args)
(kickoff-pipeline pipeline n args)
Given an instance of Pipeline, will fetch its source, turn it into a channel, and funnel it into the pipeline's run function, where it will process the pipeline with multiple threads. Can take an optional n number of records to process, if n is nill all records will be processed.
Given an instance of Pipeline, will fetch its source, turn it into a channel, and funnel it into the pipeline's run function, where it will process the pipeline with multiple threads. Can take an optional n number of records to process, if n is nill all records will be processed.
(n-or-all n coll)
If n is falsy return the entire collection, otherwise take n elements and returnns those.
If n is falsy return the entire collection, otherwise take n elements and returnns those.
(paral? xf)
Returns whether a given trasnformer should be treated as parallelizable.
Returns whether a given trasnformer should be treated as parallelizable.
(seqed source)
Converts a pipeline source into a sequence. Used by `tryout' for doing sequential execution.
Converts a pipeline source into a sequence. Used by `tryout' for doing sequential execution.
(threaded-pipe n from xf)
Takes elements from the from channel and calls the transformer on it in a separate thread using blocking semantics. Useful if, say, the to channel has an io-bound transducer attached to it. Merges the output of all these channels and puts it into a `to' channel that it creates. Returns this to channel.
Takes elements from the from channel and calls the transformer on it in a separate thread using blocking semantics. Useful if, say, the to channel has an io-bound transducer attached to it. Merges the output of all these channels and puts it into a `to' channel that it creates. Returns this to channel.
(tryout pipeline args)
(tryout pipeline n args)
Given an instance of Pipeline, will fetch its source and funnel it into the pipeline's run function as a simple collection, this will cause the pipeline to be executed sequentially. Takes an optional number `n' of number of source records to execute, defaults to 1000.
Given an instance of Pipeline, will fetch its source and funnel it into the pipeline's run function as a simple collection, this will cause the pipeline to be executed sequentially. Takes an optional number `n' of number of source records to execute, defaults to 1000.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close