Support functions for working with histories. This provides two things you need for writing efficient checkers:
A dedicated Op defrecord which speeds up the most commonly accessed fields and reduces memory footprint.
A History datatype which generally works like a vector, but also supports efficient fetching of operations by index, mapping back and forth between invocations and completions, efficient lazy map/filter, fusable concurrent reduce/fold, and a dependency-oriented task executor.
Create operations with the op
function. Unlike most defrecords, we
pretty-print these as if they were maps--we print a LOT of them.
(require '[jepsen.history :as h]) (def o (h/op {:process 0, :type :invoke, :f :read, :value [:x nil], :index 0, :time 0})) (pprint o) ; {:process 0, ; :type :invoke, ; :f :read, ; :value [:x nil], ; :index 0, ; :time 0}
We provide a few common functions for interacting with operations:
(invoke? o) ; true (client-op? o) ; true (info? o) ; false
And of course you can use fast field accessors here too:
(.process o) ; 0
Given a collection of operations, create a history like so:
(def h (h/history [{:process 0, :type :invoke, :f :read} {:process 0, :type :ok, :f :read, :value 5}]))
history
automatically lifts maps into Ops if they aren't already, and adds
indices (sequential) and times (-1) if you omit them. There are options to
control how indices are added; see history
for details.
(pprint h) ; [{:process 0, :type :invoke, :f :read, :value nil, :index 0, :time -1} ; {:process 0, :type :ok, :f :read, :value 5, :index 1, :time -1}]
If you need to convert these back to plain-old maps for writing tests, use
as-maps
.
(h/as-maps h) ; [{:index 0, :time -1, :type :invoke, :process 0, :f :read, :value nil} ; {:index 1, :time -1, :type :ok, :process 0, :f :read, :value 5}]
Histories work almost exactly like vectors (though you can't assoc or conj into them).
(count h) ; 2 (nth h 1) ; {:index 1, :time -1, :type :ok, :process 0, :f :read, :value 5} (map :type h) ; [:invoke :ok]
But they have a few extra powers. You can get the Op with a particular :index regardless of where it is in the collection.
(h/get-index h 0) ; {:index 0, :time -1, :type :invoke, :process 0, :f :read, :value nil}
And you can find the corresponding invocation for a completion, and vice-versa:
(h/invocation h {:index 1, :time -1, :type :ok, :process 0, :f :read, :value 5}) ; {:index 0, :time -1, :type :invoke, :process 0, :f :read, :value nil}
(h/completion h {:index 0, :time -1, :type :invoke, :process 0, :f :read, :value nil}) ; {:index 1, :time -1, :type :ok, :process 0, :f :read, :value 5}
We call histories where the :index fields are 0, 1, 2, ... 'dense', and other
histories 'sparse'. With dense histories, get-index
is just nth
. Sparse
histories are common when you're restricting yourself to just a subset of the
history, like operations on clients. If you pass sparse indices to (history ops)
, then ask for an op by index, it'll do a one-time fold over the ops to
find their indices, then cache a lookup table to make future lookups fast.
(def h (history [{:index 3, :process 0, :type :invoke, :f :cas, :value [7 8]}])) (h/dense-indices? h) ; false (get-index h 3) ; {:index 3, :time -1, :type :invoke, :process 0, :f :cas, :value [7 8]}
Let's get a slightly more involved history. This one has a concurrent nemesis crashing while process 0 writes 3.
(def h (h/history [{:process 0, :type :invoke, :f :write, :value 3} {:process :nemesis, :type :info, :f :crash} {:process 0, :type :ok, :f :write, :value 3} {:process :nemesis, :type :info, :f :crash}]))
Of course we can filter this to just client operations using regular seq operations...
(filter h/client-op? h) ; [{:process 0, :type :invoke, :f :write, :value 3, :index 0, :time -1} ; {:process 0, :type :ok, :f :write, :value 3, :index 2, :time -1}]
But jepsen.history
also exposes a more efficient version:
(h/filter h/client-op? h) ; [{:index 0, :time -1, :type :invoke, :process 0, :f :write, :value 3} ; {:index 2, :time -1, :type :ok, :process 0, :f :write, :value 3}]
There are also shortcuts for common filtering ops: client-ops
, invokes
,
oks
, infos
, and so on.
(def ch (h/client-ops h)) (type ch) ; jepsen.history.FilteredHistory
Creating a filtered history is O(1), and acts as a lazy view on top of the
underlying history. Like clojure.core/filter
, it materializes elements as
needed. Unlike Clojure's filter
, it does not (for most ops) cache results
in memory, so we can work with collections bigger than RAM. Instead, each
seq/reduce/fold/etc applies the filtering function to the underlying history
on-demand.
When you ask for a count, or to fetch operations by index, or to map between invocations and completions, a FilteredHistory computes a small, reduced data structure on the fly, and caches it to make later operations of the same type fast.
(count ch) ; Folds over entire history to count how many match the predicate ; 2 (count ch) ; Cached
; (h/completion ch (first ch)) ; Folds over history to pair up ops, caches {:index 2, :time -1, :type :ok, :process 0, :f :write, :value 3}
; (h/get-index ch 2) ; No fold required; underlying history does get-index {:index 2, :time -1, :type :ok, :process 0, :f :write, :value 3}
Similarly, h/map
constructs an O(1) lazy view over another history. These
compose just like normal Clojure map
/filter
, and all share structure with
the underlying history.
All histories support reduce
, clojure.core.reducers/fold
, Tesser's
tesser
, and jepsen.history.fold/fold
. All four mechanisms are backed by
a jepsen.history.fold
Folder, which allows concurrent folds to be joined
together on-the-fly and executed in fewer passes over the underlying data.
Reducers, Tesser, and history folds can also be executed in parallel.
Histories created with map
and filter
share the folder of their
underlying history, which means that two threads analyzing different views of
the same underlying history can have their folds joined into a single pass
automatically. This should hopefully be invisible to users, other than making
things Automatically Faster.
If you filter a history to a small subset of operations, or are comfortable
working in-memory, it may be sensible to materialize a history. Just use
(vec h)
to convert a history a plain-old Clojure vector again.
Analyzers often perform several independent reductions over a history, and
then compute new values based on those previous reductions. You can of course
use future
for this, but histories also come with a shared,
dependency-aware threadpool executor for executing compute-bound concurrent
tasks. All histories derived from the same history share the same executor,
which means multiple checkers can launch tasks on it without launching a
bazillion threads. For instance, we might need to know if a history includes
crashes:
(def first-crash (h/task h find-first-crash [] (->> h (h/filter (comp #{:crash} :f)) first)))
Like futures, deref'ing a task yields its result, or throws.
@first-crash {:index 1, :time -1, :type :info, :process :nemesis, :f :crash, :value nil}
Unlike futures, tasks can express dependencies on other tasks:
(def ops-before-crash (h/task h writes [fc first-crash] (let [i (:index first-crash)] (into [] (take-while #(< (:index %) i)) h))))
This task won't run until first-crash has completed, and receives the result of the first-crash task as its argument.
@ops-before-crash ; [{:index 0, :time -1, :type :invoke, :process 0, :f :write, :value 3}]
See jepsen.history.task
for more details.
Support functions for working with histories. This provides two things you need for writing efficient checkers: 1. A dedicated Op defrecord which speeds up the most commonly accessed fields and reduces memory footprint. 2. A History datatype which generally works like a vector, but also supports efficient fetching of operations by index, mapping back and forth between invocations and completions, efficient lazy map/filter, fusable concurrent reduce/fold, and a dependency-oriented task executor. ## Ops Create operations with the `op` function. Unlike most defrecords, we pretty-print these as if they were maps--we print a LOT of them. (require '[jepsen.history :as h]) (def o (h/op {:process 0, :type :invoke, :f :read, :value [:x nil], :index 0, :time 0})) (pprint o) ; {:process 0, ; :type :invoke, ; :f :read, ; :value [:x nil], ; :index 0, ; :time 0} We provide a few common functions for interacting with operations: (invoke? o) ; true (client-op? o) ; true (info? o) ; false And of course you can use fast field accessors here too: (.process o) ; 0 ## Histories Given a collection of operations, create a history like so: (def h (h/history [{:process 0, :type :invoke, :f :read} {:process 0, :type :ok, :f :read, :value 5}])) `history` automatically lifts maps into Ops if they aren't already, and adds indices (sequential) and times (-1) if you omit them. There are options to control how indices are added; see `history` for details. (pprint h) ; [{:process 0, :type :invoke, :f :read, :value nil, :index 0, :time -1} ; {:process 0, :type :ok, :f :read, :value 5, :index 1, :time -1}] If you need to convert these back to plain-old maps for writing tests, use `as-maps`. (h/as-maps h) ; [{:index 0, :time -1, :type :invoke, :process 0, :f :read, :value nil} ; {:index 1, :time -1, :type :ok, :process 0, :f :read, :value 5}] Histories work almost exactly like vectors (though you can't assoc or conj into them). (count h) ; 2 (nth h 1) ; {:index 1, :time -1, :type :ok, :process 0, :f :read, :value 5} (map :type h) ; [:invoke :ok] But they have a few extra powers. You can get the Op with a particular :index regardless of where it is in the collection. (h/get-index h 0) ; {:index 0, :time -1, :type :invoke, :process 0, :f :read, :value nil} And you can find the corresponding invocation for a completion, and vice-versa: (h/invocation h {:index 1, :time -1, :type :ok, :process 0, :f :read, :value 5}) ; {:index 0, :time -1, :type :invoke, :process 0, :f :read, :value nil} (h/completion h {:index 0, :time -1, :type :invoke, :process 0, :f :read, :value nil}) ; {:index 1, :time -1, :type :ok, :process 0, :f :read, :value 5} We call histories where the :index fields are 0, 1, 2, ... 'dense', and other histories 'sparse'. With dense histories, `get-index` is just `nth`. Sparse histories are common when you're restricting yourself to just a subset of the history, like operations on clients. If you pass sparse indices to `(history ops)`, then ask for an op by index, it'll do a one-time fold over the ops to find their indices, then cache a lookup table to make future lookups fast. (def h (history [{:index 3, :process 0, :type :invoke, :f :cas, :value [7 8]}])) (h/dense-indices? h) ; false (get-index h 3) ; {:index 3, :time -1, :type :invoke, :process 0, :f :cas, :value [7 8]} Let's get a slightly more involved history. This one has a concurrent nemesis crashing while process 0 writes 3. (def h (h/history [{:process 0, :type :invoke, :f :write, :value 3} {:process :nemesis, :type :info, :f :crash} {:process 0, :type :ok, :f :write, :value 3} {:process :nemesis, :type :info, :f :crash}])) Of course we can filter this to just client operations using regular seq operations... (filter h/client-op? h) ; [{:process 0, :type :invoke, :f :write, :value 3, :index 0, :time -1} ; {:process 0, :type :ok, :f :write, :value 3, :index 2, :time -1}] But `jepsen.history` also exposes a more efficient version: (h/filter h/client-op? h) ; [{:index 0, :time -1, :type :invoke, :process 0, :f :write, :value 3} ; {:index 2, :time -1, :type :ok, :process 0, :f :write, :value 3}] There are also shortcuts for common filtering ops: `client-ops`, `invokes`, `oks`, `infos`, and so on. (def ch (h/client-ops h)) (type ch) ; jepsen.history.FilteredHistory Creating a filtered history is O(1), and acts as a lazy view on top of the underlying history. Like `clojure.core/filter`, it materializes elements as needed. Unlike Clojure's `filter`, it does not (for most ops) cache results in memory, so we can work with collections bigger than RAM. Instead, each seq/reduce/fold/etc applies the filtering function to the underlying history on-demand. When you ask for a count, or to fetch operations by index, or to map between invocations and completions, a FilteredHistory computes a small, reduced data structure on the fly, and caches it to make later operations of the same type fast. (count ch) ; Folds over entire history to count how many match the predicate ; 2 (count ch) ; Cached ; (h/completion ch (first ch)) ; Folds over history to pair up ops, caches {:index 2, :time -1, :type :ok, :process 0, :f :write, :value 3} ; (h/get-index ch 2) ; No fold required; underlying history does get-index {:index 2, :time -1, :type :ok, :process 0, :f :write, :value 3} Similarly, `h/map` constructs an O(1) lazy view over another history. These compose just like normal Clojure `map`/`filter`, and all share structure with the underlying history. ### Folds All histories support `reduce`, `clojure.core.reducers/fold`, Tesser's `tesser`, and `jepsen.history.fold/fold`. All four mechanisms are backed by a `jepsen.history.fold` Folder, which allows concurrent folds to be joined together on-the-fly and executed in fewer passes over the underlying data. Reducers, Tesser, and history folds can also be executed in parallel. Histories created with `map` and `filter` share the folder of their underlying history, which means that two threads analyzing different views of the same underlying history can have their folds joined into a single pass automatically. This should hopefully be invisible to users, other than making things Automatically Faster. If you filter a history to a small subset of operations, or are comfortable working in-memory, it may be sensible to materialize a history. Just use `(vec h)` to convert a history a plain-old Clojure vector again. ### Tasks Analyzers often perform several independent reductions over a history, and then compute new values based on those previous reductions. You can of course use `future` for this, but histories also come with a shared, dependency-aware threadpool executor for executing compute-bound concurrent tasks. All histories derived from the same history share the same executor, which means multiple checkers can launch tasks on it without launching a bazillion threads. For instance, we might need to know if a history includes crashes: (def first-crash (h/task h find-first-crash [] (->> h (h/filter (comp #{:crash} :f)) first))) Like futures, deref'ing a task yields its result, or throws. @first-crash {:index 1, :time -1, :type :info, :process :nemesis, :f :crash, :value nil} Unlike futures, tasks can express *dependencies* on other tasks: (def ops-before-crash (h/task h writes [fc first-crash] (let [i (:index first-crash)] (into [] (take-while #(< (:index %) i)) h)))) This task won't run until first-crash has completed, and receives the result of the first-crash task as its argument. @ops-before-crash ; [{:index 0, :time -1, :type :invoke, :process 0, :f :write, :value 3}] See `jepsen.history.task` for more details.
Basic interfaces and protocols, utility functions
Basic interfaces and protocols, utility functions
Provides a stateful folder for running folds (like reduce
) over chunked,
immutable collections in linear and concurrent passes. Intended for systems
where the reduction over a chunk may involve expensive work, and not fit in
memory--for instance, deserializing values from disk. Provides sophisticated
optimizations for running folds in parallel, and automatically fusing
together multiple folds.
To build a folder, you need a chunkable collection: see jepsen.history.core. Jepsen.history chunks vectors by default at 16384 elements per chunk, which is a bit big for a demonstration, so let's chunk explicitly:
(require '[tesser.core :as t] '[jepsen.history [core :as hc] [fold :as f]]) (def dogs [{:legs 6, :name :noodle}, {:legs 4, :name :stop-it}, {:legs 4, :name :brown-one-by-the-fish-shop}]) (def chunked-dogs (hc/chunked 2 dogs)) (pprint (hc/chunks chunked-dogs)) ; ([{:legs 6, :name :noodle} {:legs 4, :name :stop-it}] ; [{:legs 4, :name :brown-one-by-the-fish-shop}])
In real use, chunks should be big enough to take a bit (a second or so?) to reduce. We keep track of some state for each chunk, so millions is probably too many. If you have fewer chunks than processors, we won't be able to optimize as efficiently.
A folder wraps a chunked collection, like so:
(def f (f/folder chunked-dogs))
Now we can perform a reduction on the folder. This works just like Clojure reduce:
(reduce (fn [max-legs dog] (max max-legs (:legs dog))) 0 e) ; => 6
Which means transducers and into work like you'd expect:
(into #{} (map :legs) f) ; => #{4 6}
OK, great. What's the point? Imagine we had a collection where getting elements was expensive--for instance, if they required IO or expensive processing. Let's put ten million dogs on disk as JSON.
(require '[jepsen.history.fold-test :as ft]) (def dogs (ft/gen-dogs-file! 1e7)) (def f (f/folder dogs))
Reducing over ten million dogs as JSON takes about ten seconds on my machine.
(time (into #{} (map :legs) dogs)) ; 10364 msecs
But with a folder, we can do something neat:
(def leg-set {:reducer-identity (constantly #{}) :reducer (fn [legs dog] (conj legs (:legs dog))) :combiner clojure.set/union}) (time (f/fold f leg-set)) ; 1660 msecs
This went roughly six times faster because the folder reduced each chunk in parallel. Now let's run, say, ten reductions in parallel.
(time (doall (pmap (fn [_] (into #{} (map :legs) dogs)) (range 10)))) ; 28477 msecs
This 28 seconds is faster than running ten folds sequentially (which would have been roughly 87 seconds), because we've got multiple cores to do the reduction. But we're still paying a significant cost because each of those reductions has to re-parse the file as it goes.
(time (doall (pmap (fn [_] (f/fold f leg-set)) (range 10)))) ; 2261 msecs
Twelve times faster than the parallel version! And roughly 45x faster than doing the reductions naively in serial. How? Because when you ask a folder to reduce something, and it's already running another reduction, it joins your new reduction to the old one and performs most (or even all) of the work in a single pass. The folder is smart enough to do this for both linear and concurrent folds--and it does it while ensuring strict order and thread safety for mutable accumulators. Let's replace that reduction with a mutable HashSet, and convert it back to a Clojure set at the end.
(import java.util.HashSet) (defn mut-hash-set [] (HashSet.)) (def fast-leg-set {:reducer-identity mut-hash-set :reducer (fn [^HashSet s, dog] (.add s (:legs dog)) s) :combiner-identity mut-hash-set :combiner (fn [^HashSet s1, ^HashSet s2] (.addAll s1 s2) s1) :post-combiner set}) (time (doall (pmap (fn [_] (f/fold f fast-leg-set)) (range 10)))) ; 2197 msecs
A fold represents a reduction over a history, which can optionally be executed over chunks concurrently. It's a map with the following fields:
; Metadata
:name The unique name of this fold. May be any object, but probably a keyword.
; How to reduce a chunk
:reducer-identity A function (f history) which generates an identity object for a reduction over a chunk.
:reducer A function (f history acc op) which takes a history, a chunk accumulator, and an operation from the history, and returns a new accumulator.
:post-reducer A function (f history acc) which takes the final accumulator from a chunk and transforms it before being passed to the combiner
; How to combine chunks together
:combiner-identity A function (f history) which generates an identity object for combining chunk results together.
:combiner A function (f history acc chunk-result) which folds the result of a chunk into the combiner's accumulator. If nil, performs a left fold linearly, and does not combine at all.
:post-combiner A function (f history acc) which takes the final acc from merging all chunks and produces the fold's return value.
; Execution hints
:associative? If true, the combine function is associative, and can be applied in any order. If false, we must combine left-to-right. Right now this does nothing; we haven't implemented associative combine.
:asap? Folders ramp up processing of concurrent folds gradually
to give other folds a chance to join the pass. Setting
this to true
disables that optimization, which means a
fold can complete more quickly--at the cost of slowing
down other folds.
Folds should be pure functions of their histories, though reducers and
combiners are allowed to use in-memory mutability; each is guaranteed to be
single-threaded. We guarantee that reducers execute at most once per element,
and at most once per chunk. When not using reduced
, they are exactly-once.
The final return value from a fold should be immutable so that other readers
or folds can use it safely in a concurrent context.
For linear folds (e.g. those with no combiner), we support the usual
(reduced x)
early-return mechanism. This means reduce and transduce work as
you'd expect. Reduced values still go through the post-reducer function.
For concurrent folds, (reduced x)
in a reducer terminates reduction of that
particular chunk. Other chunks are still reduced. Reduced values
go through post-reduce and are passed to the combiner unwrapped.
If the combiner returns a reduced value, that value passes immediately to the
post-combiner, without considering any other chunks.
Like transducers, post-reducers and post-combiners act on the values inside Reduced wrappers; they cannot distinguish between reduced and non-reduced values.
Provides a stateful folder for running folds (like `reduce`) over chunked, immutable collections in linear and concurrent passes. Intended for systems where the reduction over a chunk may involve expensive work, and not fit in memory--for instance, deserializing values from disk. Provides sophisticated optimizations for running folds in parallel, and automatically fusing together multiple folds. To build a folder, you need a chunkable collection: see jepsen.history.core. Jepsen.history chunks vectors by default at 16384 elements per chunk, which is a bit big for a demonstration, so let's chunk explicitly: (require '[tesser.core :as t] '[jepsen.history [core :as hc] [fold :as f]]) (def dogs [{:legs 6, :name :noodle}, {:legs 4, :name :stop-it}, {:legs 4, :name :brown-one-by-the-fish-shop}]) (def chunked-dogs (hc/chunked 2 dogs)) (pprint (hc/chunks chunked-dogs)) ; ([{:legs 6, :name :noodle} {:legs 4, :name :stop-it}] ; [{:legs 4, :name :brown-one-by-the-fish-shop}]) In real use, chunks should be big enough to take a bit (a second or so?) to reduce. We keep track of some state for each chunk, so millions is probably too many. If you have fewer chunks than processors, we won't be able to optimize as efficiently. A folder wraps a chunked collection, like so: (def f (f/folder chunked-dogs)) Now we can perform a reduction on the folder. This works just like Clojure reduce: (reduce (fn [max-legs dog] (max max-legs (:legs dog))) 0 e) ; => 6 Which means transducers and into work like you'd expect: (into #{} (map :legs) f) ; => #{4 6} OK, great. What's the point? Imagine we had a collection where getting elements was expensive--for instance, if they required IO or expensive processing. Let's put ten million dogs on disk as JSON. (require '[jepsen.history.fold-test :as ft]) (def dogs (ft/gen-dogs-file! 1e7)) (def f (f/folder dogs)) Reducing over ten million dogs as JSON takes about ten seconds on my machine. (time (into #{} (map :legs) dogs)) ; 10364 msecs But with a folder, we can do something *neat*: (def leg-set {:reducer-identity (constantly #{}) :reducer (fn [legs dog] (conj legs (:legs dog))) :combiner clojure.set/union}) (time (f/fold f leg-set)) ; 1660 msecs This went roughly six times faster because the folder reduced each chunk in parallel. Now let's run, say, ten reductions in parallel. (time (doall (pmap (fn [_] (into #{} (map :legs) dogs)) (range 10)))) ; 28477 msecs This 28 seconds is faster than running ten folds sequentially (which would have been roughly 87 seconds), because we've got multiple cores to do the reduction. But we're still paying a significant cost because each of those reductions has to re-parse the file as it goes. (time (doall (pmap (fn [_] (f/fold f leg-set)) (range 10)))) ; 2261 msecs Twelve times faster than the parallel version! And roughly 45x faster than doing the reductions naively in serial. How? Because when you ask a folder to reduce something, and it's already running another reduction, it *joins* your new reduction to the old one and performs most (or even all) of the work in a single pass. The folder is smart enough to do this for both linear and concurrent folds--and it does it while ensuring strict order and thread safety for mutable accumulators. Let's replace that reduction with a mutable HashSet, and convert it back to a Clojure set at the end. (import java.util.HashSet) (defn mut-hash-set [] (HashSet.)) (def fast-leg-set {:reducer-identity mut-hash-set :reducer (fn [^HashSet s, dog] (.add s (:legs dog)) s) :combiner-identity mut-hash-set :combiner (fn [^HashSet s1, ^HashSet s2] (.addAll s1 s2) s1) :post-combiner set}) (time (doall (pmap (fn [_] (f/fold f fast-leg-set)) (range 10)))) ; 2197 msecs # In general A fold represents a reduction over a history, which can optionally be executed over chunks concurrently. It's a map with the following fields: ; Metadata :name The unique name of this fold. May be any object, but probably a keyword. ; How to reduce a chunk :reducer-identity A function (f history) which generates an identity object for a reduction over a chunk. :reducer A function (f history acc op) which takes a history, a chunk accumulator, and an operation from the history, and returns a new accumulator. :post-reducer A function (f history acc) which takes the final accumulator from a chunk and transforms it before being passed to the combiner ; How to combine chunks together :combiner-identity A function (f history) which generates an identity object for combining chunk results together. :combiner A function (f history acc chunk-result) which folds the result of a chunk into the combiner's accumulator. If nil, performs a left fold linearly, and does not combine at all. :post-combiner A function (f history acc) which takes the final acc from merging all chunks and produces the fold's return value. ; Execution hints :associative? If true, the combine function is associative, and can be applied in any order. If false, we must combine left-to-right. Right now this does nothing; we haven't implemented associative combine. :asap? Folders ramp up processing of concurrent folds gradually to give other folds a chance to join the pass. Setting this to `true` disables that optimization, which means a fold can complete more quickly--at the cost of slowing down other folds. Folds should be pure functions of their histories, though reducers and combiners are allowed to use in-memory mutability; each is guaranteed to be single-threaded. We guarantee that reducers execute at most once per element, and at most once per chunk. When not using `reduced`, they are exactly-once. The final return value from a fold should be immutable so that other readers or folds can use it safely in a concurrent context. ## Early Termination For linear folds (e.g. those with no combiner), we support the usual `(reduced x)` early-return mechanism. This means reduce and transduce work as you'd expect. Reduced values still go through the post-reducer function. For concurrent folds, `(reduced x)` in a reducer terminates reduction of that particular chunk. Other chunks are still reduced. Reduced values go through post-reduce and are passed to the combiner unwrapped. If the combiner returns a reduced value, that value passes immediately to the post-combiner, without considering any other chunks. Like transducers, post-reducers and post-combiners act on the values inside Reduced wrappers; they cannot distinguish between reduced and non-reduced values.
A transactional, dependency-graph oriented task scheduler. Provides a stateful executor for CPU-bound tasks backed by a num-cores ThreadPoolExecutor, and allows you to submit tasks to be run on that executor.
(require '[jepsen.history.task :as task]) (def e (task/executor))
At a very high level, a task is a named function of optional dependencies (inputs) which returns an output. Here's a task with no dependencies:
(def pet (task/submit! e :pet-dog (fn [_] :petting-dog)))
Tasks are derefable with the standard blocking and nonblocking calls.
Derefing a task returns its output. You can ask completion with realized?
@pet ; :petting-dog (realized? pet) ; true
If a task throws, its output is the Throwable it threw. Derefing that throwable also throws, like Clojure futures. Exceptions propagate to dependencies: dependencies will never execute, and if derefed, will throw as well.
(def doomed (task/submit! e :doomed (fn [_] (assert false)))) ; All fine, until @doomed ; throws Assert failed: false
Each task is assigned a unique long ID by its executor. Tasks should never be used across executors; their hashcodes and equality semantics are, for performance reasons, by ID alone.
(task/id pet) ; 0
Tasks also have names, which can be any non-nil object, and are used for debugging & observability. Tasks can also carry an arbitrary data object, which can be anything you like. You can use this to build more sophisticated task management systems around this executor.
(task/name pet) ; :pet-dog
(def train (task/submit! e :train {:tricks [:down-stay :recall]} nil (fn [_] :training-dog))) (task/data train) ; {:tricks [:down-stay :recall]}
When submitted, tasks can depend on earlier tasks. When it executes, a task receives a vector of the outputs of its dependencies. A task only executes once its dependencies have completed, and will observe their memory effects.
(def dog-promise (promise)) (def dog-task (task/submit! e :make-dog (fn [_] @dog-promise))) (def person-task (task/submit! e :make-person (fn [_] :nona))) (def adopt-task (task/submit! e :adopt [person-task dog-task] (fn [[person dog]] (prn person :adopts dog) :adopted!))) ; Adopt pauses, waiting on dog, which in turn is waiting on our ; dog-promise. (realized? adopt-task) ; false (task/dep-ids adopt-task) ; [5 4]
;Person completed immediately: @person-task ; :nona
; Let's let the dog task run. Once it does, adopt can run too. (deliver dog-promise :noodle) ; Immediately prints :nona :adopts :noodle
; Now we can deref the adoption task. @adopt-task ; :adopted!
Tasks may be cancelled. Cancelling a task also cancels all tasks which depend on it. Unlike normal ThreadPoolExecutors, cancellation is guaranteed to be safe: if a task is still pending, it will never run. Cancelling a task which is running or has already run has no effect, other than removing it from the executor state. This may not be right for all applications; it's important for us.
(task/cancel! e adopt-task)
Tasks either run to completion or are cancelled; they are never interrupted. If they are, who knows what could happen? Almost certainly the executor will stall some tasks forever. Hopefully you're throwing away the executor and moving on with your life.
Unlike standard j.u.c executors, tasks in this system may be created,
queried, and cancelled transactionally. The executor's state (State
) is a
persistent, immutable structure. You perform a transaction with (txn! executor (fn [state] ...))
, which returns a new state. Any transformations
you apply to the state take place atomically. Note that these functions use
pure functions without !
: submit
instead of submit!
, etc.
; Create a task which blocks... (def dog-promise (promise)) (def dog-task (task/submit! e :find-dog (fn [_] @dog-promise))) ; And one that depends on it (def adopt-task (task/submit! e :adopt-dog [dog-task] (fn [dog] (println :adopting dog) [:adopted dog])))
; adopt-task hasn't run because dog-task is still pending. (task/txn! e (fn [state] ; Make sure the adoption hasn't happened yet (if-not (task/pending? state adopt-task) state ; If it hasn't happened yet, cancel the dog task. Adoption will be ; cancelled automatically. (let [state' (task/cancel state dog-task) ; And do something else [state' new-task] (task/submit state' :enter-tomb (fn [_] (prn :oh-no) :tomb-entered))] state')))) ; prints :oh-no (deliver dog-promise :noodle) ; Adoption never happens!
A transactional, dependency-graph oriented task scheduler. Provides a stateful executor for CPU-bound tasks backed by a num-cores ThreadPoolExecutor, and allows you to submit tasks to be run on that executor. (require '[jepsen.history.task :as task]) (def e (task/executor)) At a very high level, a task is a named function of optional dependencies (inputs) which returns an output. Here's a task with no dependencies: (def pet (task/submit! e :pet-dog (fn [_] :petting-dog))) Tasks are derefable with the standard blocking and nonblocking calls. Derefing a task returns its output. You can ask completion with `realized?` @pet ; :petting-dog (realized? pet) ; true If a task throws, its output is the Throwable it threw. Derefing that throwable also throws, like Clojure futures. Exceptions propagate to dependencies: dependencies will never execute, and if derefed, will throw as well. (def doomed (task/submit! e :doomed (fn [_] (assert false)))) ; All fine, until @doomed ; throws Assert failed: false Each task is assigned a unique long ID by its executor. Tasks should never be used across executors; their hashcodes and equality semantics are, for performance reasons, by ID *alone*. (task/id pet) ; 0 Tasks also have names, which can be any non-nil object, and are used for debugging & observability. Tasks can also carry an arbitrary data object, which can be anything you like. You can use this to build more sophisticated task management systems around this executor. (task/name pet) ; :pet-dog (def train (task/submit! e :train {:tricks [:down-stay :recall]} nil (fn [_] :training-dog))) (task/data train) ; {:tricks [:down-stay :recall]} When submitted, tasks can depend on earlier tasks. When it executes, a task receives a vector of the outputs of its dependencies. A task only executes once its dependencies have completed, and will observe their memory effects. (def dog-promise (promise)) (def dog-task (task/submit! e :make-dog (fn [_] @dog-promise))) (def person-task (task/submit! e :make-person (fn [_] :nona))) (def adopt-task (task/submit! e :adopt [person-task dog-task] (fn [[person dog]] (prn person :adopts dog) :adopted!))) ; Adopt pauses, waiting on dog, which in turn is waiting on our ; dog-promise. (realized? adopt-task) ; false (task/dep-ids adopt-task) ; [5 4] ;Person completed immediately: @person-task ; :nona ; Let's let the dog task run. Once it does, adopt can run too. (deliver dog-promise :noodle) ; Immediately prints :nona :adopts :noodle ; Now we can deref the adoption task. @adopt-task ; :adopted! Tasks may be cancelled. Cancelling a task also cancels all tasks which depend on it. Unlike normal ThreadPoolExecutors, cancellation is *guaranteed* to be safe: if a task is still pending, it will never run. Cancelling a task which is running or has already run has no effect, other than removing it from the executor state. This may not be right for all applications; it's important for us. (task/cancel! e adopt-task) Tasks either run to completion or are cancelled; they are never interrupted. If they are, who knows what could happen? Almost certainly the executor will stall some tasks forever. Hopefully you're throwing away the executor and moving on with your life. Unlike standard j.u.c executors, tasks in this system may be created, queried, and cancelled *transactionally*. The executor's state (`State`) is a persistent, immutable structure. You perform a transaction with `(txn! executor (fn [state] ...))`, which returns a new state. Any transformations you apply to the state take place atomically. Note that these functions use pure functions without `!`: `submit` instead of `submit!`, etc. ; Create a task which blocks... (def dog-promise (promise)) (def dog-task (task/submit! e :find-dog (fn [_] @dog-promise))) ; And one that depends on it (def adopt-task (task/submit! e :adopt-dog [dog-task] (fn [dog] (println :adopting dog) [:adopted dog]))) ; adopt-task hasn't run because dog-task is still pending. (task/txn! e (fn [state] ; Make sure the adoption hasn't happened yet (if-not (task/pending? state adopt-task) state ; If it hasn't happened yet, cancel the dog task. Adoption will be ; cancelled automatically. (let [state' (task/cancel state dog-task) ; And do something else [state' new-task] (task/submit state' :enter-tomb (fn [_] (prn :oh-no) :tomb-entered))] state')))) ; prints :oh-no (deliver dog-promise :noodle) ; Adoption never happens!
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close