A transactional, dependency-graph oriented task scheduler. Provides a stateful executor for CPU-bound tasks backed by a num-cores ThreadPoolExecutor, and allows you to submit tasks to be run on that executor.
(require '[jepsen.history.task :as task]) (def e (task/executor))
At a very high level, a task is a named function of optional dependencies (inputs) which returns an output. Here's a task with no dependencies:
(def pet (task/submit! e :pet-dog (fn [_] :petting-dog)))
Tasks are derefable with the standard blocking and nonblocking calls.
Derefing a task returns its output. You can ask completion with realized?
@pet ; :petting-dog (realized? pet) ; true
If a task throws, its output is the Throwable it threw. Derefing that throwable also throws, like Clojure futures. Exceptions propagate to dependencies: dependencies will never execute, and if derefed, will throw as well.
(def doomed (task/submit! e :doomed (fn [_] (assert false)))) ; All fine, until @doomed ; throws Assert failed: false
Each task is assigned a unique long ID by its executor. Tasks should never be used across executors; their hashcodes and equality semantics are, for performance reasons, by ID alone.
(task/id pet) ; 0
Tasks also have names, which can be any non-nil object, and are used for debugging & observability. Tasks can also carry an arbitrary data object, which can be anything you like. You can use this to build more sophisticated task management systems around this executor.
(task/name pet) ; :pet-dog
(def train (task/submit! e :train {:tricks [:down-stay :recall]} nil (fn [_] :training-dog))) (task/data train) ; {:tricks [:down-stay :recall]}
When submitted, tasks can depend on earlier tasks. When it executes, a task receives a vector of the outputs of its dependencies. A task only executes once its dependencies have completed, and will observe their memory effects.
(def dog-promise (promise)) (def dog-task (task/submit! e :make-dog (fn [_] @dog-promise))) (def person-task (task/submit! e :make-person (fn [_] :nona))) (def adopt-task (task/submit! e :adopt [person-task dog-task] (fn [[person dog]] (prn person :adopts dog) :adopted!))) ; Adopt pauses, waiting on dog, which in turn is waiting on our ; dog-promise. (realized? adopt-task) ; false (task/dep-ids adopt-task) ; [5 4]
;Person completed immediately: @person-task ; :nona
; Let's let the dog task run. Once it does, adopt can run too. (deliver dog-promise :noodle) ; Immediately prints :nona :adopts :noodle
; Now we can deref the adoption task. @adopt-task ; :adopted!
Tasks may be cancelled. Cancelling a task also cancels all tasks which depend on it. Unlike normal ThreadPoolExecutors, cancellation is guaranteed to be safe: if a task is still pending, it will never run. Cancelling a task which is running or has already run has no effect, other than removing it from the executor state. This may not be right for all applications; it's important for us.
(task/cancel! e adopt-task)
Tasks either run to completion or are cancelled; they are never interrupted. If they are, who knows what could happen? Almost certainly the executor will stall some tasks forever. Hopefully you're throwing away the executor and moving on with your life.
Unlike standard j.u.c executors, tasks in this system may be created,
queried, and cancelled transactionally. The executor's state (State
) is a
persistent, immutable structure. You perform a transaction with (txn! executor (fn [state] ...))
, which returns a new state. Any transformations
you apply to the state take place atomically. Note that these functions use
pure functions without !
: submit
instead of submit!
, etc.
; Create a task which blocks... (def dog-promise (promise)) (def dog-task (task/submit! e :find-dog (fn [_] @dog-promise))) ; And one that depends on it (def adopt-task (task/submit! e :adopt-dog [dog-task] (fn [dog] (println :adopting dog) [:adopted dog])))
; adopt-task hasn't run because dog-task is still pending. (task/txn! e (fn [state] ; Make sure the adoption hasn't happened yet (if-not (task/pending? state adopt-task) state ; If it hasn't happened yet, cancel the dog task. Adoption will be ; cancelled automatically. (let [state' (task/cancel state dog-task) ; And do something else [state' new-task] (task/submit state' :enter-tomb (fn [_] (prn :oh-no) :tomb-entered))] state')))) ; prints :oh-no (deliver dog-promise :noodle) ; Adoption never happens!
A transactional, dependency-graph oriented task scheduler. Provides a stateful executor for CPU-bound tasks backed by a num-cores ThreadPoolExecutor, and allows you to submit tasks to be run on that executor. (require '[jepsen.history.task :as task]) (def e (task/executor)) At a very high level, a task is a named function of optional dependencies (inputs) which returns an output. Here's a task with no dependencies: (def pet (task/submit! e :pet-dog (fn [_] :petting-dog))) Tasks are derefable with the standard blocking and nonblocking calls. Derefing a task returns its output. You can ask completion with `realized?` @pet ; :petting-dog (realized? pet) ; true If a task throws, its output is the Throwable it threw. Derefing that throwable also throws, like Clojure futures. Exceptions propagate to dependencies: dependencies will never execute, and if derefed, will throw as well. (def doomed (task/submit! e :doomed (fn [_] (assert false)))) ; All fine, until @doomed ; throws Assert failed: false Each task is assigned a unique long ID by its executor. Tasks should never be used across executors; their hashcodes and equality semantics are, for performance reasons, by ID *alone*. (task/id pet) ; 0 Tasks also have names, which can be any non-nil object, and are used for debugging & observability. Tasks can also carry an arbitrary data object, which can be anything you like. You can use this to build more sophisticated task management systems around this executor. (task/name pet) ; :pet-dog (def train (task/submit! e :train {:tricks [:down-stay :recall]} nil (fn [_] :training-dog))) (task/data train) ; {:tricks [:down-stay :recall]} When submitted, tasks can depend on earlier tasks. When it executes, a task receives a vector of the outputs of its dependencies. A task only executes once its dependencies have completed, and will observe their memory effects. (def dog-promise (promise)) (def dog-task (task/submit! e :make-dog (fn [_] @dog-promise))) (def person-task (task/submit! e :make-person (fn [_] :nona))) (def adopt-task (task/submit! e :adopt [person-task dog-task] (fn [[person dog]] (prn person :adopts dog) :adopted!))) ; Adopt pauses, waiting on dog, which in turn is waiting on our ; dog-promise. (realized? adopt-task) ; false (task/dep-ids adopt-task) ; [5 4] ;Person completed immediately: @person-task ; :nona ; Let's let the dog task run. Once it does, adopt can run too. (deliver dog-promise :noodle) ; Immediately prints :nona :adopts :noodle ; Now we can deref the adoption task. @adopt-task ; :adopted! Tasks may be cancelled. Cancelling a task also cancels all tasks which depend on it. Unlike normal ThreadPoolExecutors, cancellation is *guaranteed* to be safe: if a task is still pending, it will never run. Cancelling a task which is running or has already run has no effect, other than removing it from the executor state. This may not be right for all applications; it's important for us. (task/cancel! e adopt-task) Tasks either run to completion or are cancelled; they are never interrupted. If they are, who knows what could happen? Almost certainly the executor will stall some tasks forever. Hopefully you're throwing away the executor and moving on with your life. Unlike standard j.u.c executors, tasks in this system may be created, queried, and cancelled *transactionally*. The executor's state (`State`) is a persistent, immutable structure. You perform a transaction with `(txn! executor (fn [state] ...))`, which returns a new state. Any transformations you apply to the state take place atomically. Note that these functions use pure functions without `!`: `submit` instead of `submit!`, etc. ; Create a task which blocks... (def dog-promise (promise)) (def dog-task (task/submit! e :find-dog (fn [_] @dog-promise))) ; And one that depends on it (def adopt-task (task/submit! e :adopt-dog [dog-task] (fn [dog] (println :adopting dog) [:adopted dog]))) ; adopt-task hasn't run because dog-task is still pending. (task/txn! e (fn [state] ; Make sure the adoption hasn't happened yet (if-not (task/pending? state adopt-task) state ; If it hasn't happened yet, cancel the dog task. Adoption will be ; cancelled automatically. (let [state' (task/cancel state dog-task) ; And do something else [state' new-task] (task/submit state' :enter-tomb (fn [_] (prn :oh-no) :tomb-entered))] state')))) ; prints :oh-no (deliver dog-promise :noodle) ; Adoption never happens!
(add-dep-edge dep-graph dep task)
Takes a dependency graph, a dependency task, and a new task. Adds a dependency edge dep -> task iff dep is still pending, returning the new dep graph.
Takes a dependency graph, a dependency task, and a new task. Adds a dependency edge dep -> task iff dep is still pending, returning the new dep graph.
(add-task-helper state dep-graph task)
Helper for submit* and catch*. Takes a state, a dep graph, and new task, readies the task if possible, increments next-task-id, and creates effects, returning state.
Helper for submit* and catch*. Takes a state, a dep graph, and new task, readies the task if possible, increments next-task-id, and creates effects, returning state.
(apply-effects! executor)
(apply-effects! executor [type task])
Attempts to apply all pending effects from an Executor's state. Side effects may be interleaved in any order. When an effect is applied it is removed from the effect queue. Returns executor. This is generally called automatically as tasks complete; you shouldn't need to do it yourself.
In the two-arity form, performs a specific effect.
Attempts to apply all pending effects from an Executor's state. Side effects may be interleaved in any order. When an effect is applied it is removed from the effect queue. Returns executor. This is generally called automatically as tasks complete; you shouldn't need to do it yourself. In the two-arity form, performs a specific effect.
(cancel state task)
Takes a state and a task to cancel. Deletes the task, and any tasks which depend on it, from every part of the state.
Takes a state and a task to cancel. Deletes the task, and any tasks which depend on it, from every part of the state.
(cancel! executor task)
Cancels a task on an executor. Returns task.
Cancels a task on an executor. Returns task.
(capture-throw & body)
Evaluates body in a try, capturing any exception thrown and returning it in a CapturedThrowable box. Helpful for error propagation.
Evaluates body in a try, capturing any exception thrown and returning it in a CapturedThrowable box. Helpful for error propagation.
(captured-throw? x)
Is something a captured throwable?
Is something a captured throwable?
(catch state name dep f)
(catch state name data dep f)
Like catch*, but also returns the created catch task: [state catch-task]. This form is more convenient for transactional use.
Takes a state, a name, optional data, a task to depend on, and a function
f
. If task
throws, calls (f exception)
; the result is the output of the
catch task. Otherwise passes on input unchanged.
Like catch*, but also returns the created catch task: [state catch-task]. This form is more convenient for transactional use. Takes a state, a name, optional data, a task to depend on, and a function `f`. If `task` throws, calls `(f exception)`; the result is the output of the catch task. Otherwise passes on input unchanged.
(catch! executor name dep f)
(catch! executor name data dep f)
Submits a new catch task to an Executor. Takes a name for this task,
optional data, a task to depend on, and a function (f exception)
which will
be called if the dependency throws. Returns a newly created Catch task, whose
output is either the output of the dependency, or if the dependency throws,
whatever f
returns.
Submits a new catch task to an Executor. Takes a name for this task, optional data, a task to depend on, and a function `(f exception)` which will be called if the dependency throws. Returns a newly created Catch task, whose output is either the output of the dependency, or if the dependency throws, whatever `f` returns.
(catch* state name dep f)
(catch* state name data dep f)
Takes a state, a name, optional data, a task to depend on, and a function
f
. If task
throws, calls (f exception)
; the result is the output of the
catch task. Otherwise passes on input unchanged.
Returns a new state with this catch task added.
Takes a state, a name, optional data, a task to depend on, and a function `f`. If `task` throws, calls `(f exception)`; the result is the output of the catch task. Otherwise passes on input unchanged. Returns a new state with this catch task added.
(data task)
Returns the custom data associated with a task.
Returns the custom data associated with a task.
(dep-ids task)
Returns a vector of dependency IDs of a task.
Returns a vector of dependency IDs of a task.
(deps state task)
Given a State and a Task, returns a vector of Tasks that this task depends on. Tasks are nil where the task is no longer known.
Given a State and a Task, returns a vector of Tasks that this task depends on. Tasks are nil where the task is no longer known.
(executor)
Constructs a new Executor for tasks. Executors are mutable thread-safe objects.
Constructs a new Executor for tasks. Executors are mutable thread-safe objects.
(executor-state e)
Reads the current state of an Executor.
Reads the current state of an Executor.
(finish state task)
Takes a state and a task which has been executed, and marks it as completed. This deletes the state from the running set and returns the resulting state. It may also result in new tasks being ready.
Takes a state and a task which has been executed, and marks it as completed. This deletes the state from the running set and returns the resulting state. It may also result in new tasks being ready.
(finish-task! executor task)
Tells an executor that a task is finished. You probably don't need to call this yourself; tasks call this automatically.
Tells an executor that a task is finished. You probably don't need to call this yourself; tasks call this automatically.
(gc state goal to-delete)
Takes a state, a collection of goal tasks you'd like to achieve, and a set of tasks you might like to cancel. Cancels all tasks not contributing to the goal, returning a new state.
Takes a state, a collection of goal tasks you'd like to achieve, and a set of tasks you might like to cancel. Cancels all tasks not contributing to the goal, returning a new state.
(get-task state task-id)
Fetches a Task from a state by ID. Returns nil if task is not known.
Fetches a Task from a state by ID. Returns nil if task is not known.
(has-task? state task)
Takes a state and a task. Returns true iff the state knows about this task.
Takes a state and a task. Returns true iff the state knows about this task.
(pending? state task)
Takes a state and a task. Returns true iff that task is still pending execution, and can be safely cancelled.
Takes a state and a task. Returns true iff that task is still pending execution, and can be safely cancelled.
(pseudotask id)
One of the weird tricks we use (programmers HATE him!) is to abuse the id-only equality semantics of Task objects for efficient lookups in our internal graphs and sets. This constructs an empty task with the given ID. Note also that Tasks and Catches compare equal!
One of the weird tricks we use (programmers HATE him!) is to abuse the id-only equality semantics of Task objects for efficient lookups in our internal graphs and sets. This constructs an empty task with the given ID. Note also that Tasks and Catches compare equal!
(ran? task)
Returns true iff a task ran already.
Returns true iff a task ran already.
(state)
Constructs a new executor state. States have six parts:
The next task ID we'll hand out
The executor wrapping this state. We need this to close the loop on task side effects.
A graph of dependencies between tasks. Iff a -> b, b depends on a's results. Once tasks are completed, they're removed from this graph.
A set of ready tasks which have no pending dependencies, but have not yet begun. These tasks are eligible to be executed by the threadpool.
A set of running tasks we're currently executing. When the executor begins executing a task, it moves from the ready set to the running set.
A list of unapplied side effects. Pure functions of the state add effects to the end of this list. They're applied by a mutable function apply-effects!) later, and popped off the list as this occurs.
Constructs a new executor state. States have six parts: - The next task ID we'll hand out - The executor wrapping this state. We need this to close the loop on task side effects. - A graph of dependencies between tasks. Iff a -> b, b depends on a's results. Once tasks are completed, they're removed from this graph. - A set of ready tasks which have no pending dependencies, but have not yet begun. These tasks are eligible to be executed by the threadpool. - A set of running tasks we're currently executing. When the executor begins executing a task, it moves from the ready set to the running set. - A list of unapplied side effects. Pure functions of the state add effects to the end of this list. They're applied by a mutable function apply-effects!) later, and popped off the list as this occurs.
(state-done? state)
Returns true when a state has nothing ready, nothing running, and nothing in the dependency graph; e.g. there is nothing more for it to do.
Returns true when a state has nothing ready, nothing running, and nothing in the dependency graph; e.g. there is nothing more for it to do.
(state-queue state-atom)
A queue for our ThreadPoolExecutor which wraps an atom of a State. Tasks are pulled off the state's queue and placed into the running set. This lets us get around issues with handing off records from our immutable state (which is basically a queue!) to a standard mutable executor queue (which we can't mutate safely).
We're doing something kind of evil here: to avoid mutating the state twice, we actually:
A queue for our ThreadPoolExecutor which wraps an atom of a State. Tasks are pulled off the state's queue and placed into the running set. This lets us get around issues with handing off records from our immutable state (which is basically a queue!) to a standard mutable executor queue (which we can't mutate safely). We're doing something *kind of* evil here: to avoid mutating the state twice, we actually: 1. Put the task into our state directly, using our own transactional fns 2. Call executor.execute(task) 3. The executor tries to put that task onto this queue; we return true and do nothing because we *know* we already have it. 4. The executor threads poll us, and we hand them tasks from the queue.
(state-queue-claim-task*! state vol)
A support function for StateQueue.run. Definitely don't call this yourself. I mean, I call it, but I've come to terms with my sins.
Takes a state and a volatile. Tries to move a task from the ready set to the running set, and if that works, sets the volatile to that task. Otherwise sets the volatile to nil.
A support function for StateQueue.run. Definitely don't call this yourself. I mean, *I* call it, but I've come to terms with my sins. Takes a state and a volatile. Tries to move a task from the ready set to the running set, and if that works, sets the volatile to that task. Otherwise sets the volatile to nil.
(submit state name f)
(submit state name deps f)
(submit state name data deps f)
Like submit*, but also returns the created task: [state new-task]. This form is more convenient for transactional task creation.
Given a state, constructs a fresh task with the given name, optional data, optional list of Tasks as dependencies, and a function (f dep-results) which takes a vector of dependency results. Returns the state with the new task integrated, including a [:new-task task], which you can use to read the created task.
Dependencies must either be in the graph already, or realized, to prevent deadlock.
Like submit*, but also returns the created task: [state new-task]. This form is more convenient for transactional task creation. Given a state, constructs a fresh task with the given name, optional data, optional list of Tasks as dependencies, and a function (f dep-results) which takes a vector of dependency results. Returns the state with the new task integrated, including a [:new-task task], which you can use to read the created task. Dependencies must either be in the graph already, or realized, to prevent deadlock.
(submit! executor name f)
(submit! executor name deps f)
(submit! executor name data deps f)
Submits a new task to an Executor. Takes a task name, optional data, an
optional collection of Tasks as dependencies, and a function (f dep-results)
which receives a map of dependency IDs to their results.
Returns a newly created Task object.
Submits a new task to an Executor. Takes a task name, optional data, an optional collection of Tasks as dependencies, and a function `(f dep-results)` which receives a map of dependency IDs to their results. Returns a newly created Task object.
(submit* state name f)
(submit* state name deps f)
(submit* state name data deps f)
Given a state, constructs a fresh task with the given name, optional data, optional list of Tasks as dependencies, a function (f dep-results) which takes a vector of dependency results. Returns the state with the new task integrated, including a [:new-task task], which you can use to read the created task.
Dependencies must either be in the graph already, or realized, to prevent deadlock.
If the task has no dependencies, it may be ready immediately; the :new-task effect will be followed by a [:ready-task task] effect. In either case, the final effect will contain the newly created task.
Given a state, constructs a fresh task with the given name, optional data, optional list of Tasks as dependencies, a function (f dep-results) which takes a vector of dependency results. Returns the state with the new task integrated, including a [:new-task task], which you can use to read the created task. Dependencies must either be in the graph already, or realized, to prevent deadlock. If the task has no dependencies, it may be ready immediately; the :new-task effect will be followed by a [:ready-task task] effect. In either case, the final effect *will* contain the newly created task.
(throw-captured x)
Rethrows a captured throwable.
Rethrows a captured throwable.
(txn! executor f)
Executes a transaction on an executor. Takes an executor and a function which transforms that executor's State. Applies that function to the executor's state, then applies any pending side effects. Returns the state the function returned.
Locks executor queue, since txns may be ~expensive~ and the threadpool is constantly mutating it.
Executes a transaction on an executor. Takes an executor and a function which transforms that executor's State. Applies that function to the executor's state, then applies any pending side effects. Returns the state the function returned. Locks executor queue, since txns may be ~expensive~ and the threadpool is constantly mutating it.
A dummy runnable which only exists to wake up a ThreadPoolExecutor. Might also be handy if you want to create tasks that do nothing, e.g. for dependency graph sequencing tricks.
A dummy runnable which only exists to wake up a ThreadPoolExecutor. Might also be handy if you want to create tasks that do nothing, e.g. for dependency graph sequencing tricks.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close