Liking cljdoc? Tell your friends :D

jepsen.history.task

A transactional, dependency-graph oriented task scheduler. Provides a stateful executor for CPU-bound tasks backed by a num-cores ThreadPoolExecutor, and allows you to submit tasks to be run on that executor.

(require '[jepsen.history.task :as task]) (def e (task/executor))

At a very high level, a task is a named function of optional dependencies (inputs) which returns an output. Here's a task with no dependencies:

(def pet (task/submit! e :pet-dog (fn [_] :petting-dog)))

Tasks are derefable with the standard blocking and nonblocking calls. Derefing a task returns its output. You can ask completion with realized?

@pet ; :petting-dog (realized? pet) ; true

If a task throws, its output is the Throwable it threw. Derefing that throwable also throws, like Clojure futures. Exceptions propagate to dependencies: dependencies will never execute, and if derefed, will throw as well.

(def doomed (task/submit! e :doomed (fn [_] (assert false)))) ; All fine, until @doomed ; throws Assert failed: false

Each task is assigned a unique long ID by its executor. Tasks should never be used across executors; their hashcodes and equality semantics are, for performance reasons, by ID alone.

(task/id pet) ; 0

Tasks also have names, which can be any non-nil object, and are used for debugging & observability. Tasks can also carry an arbitrary data object, which can be anything you like. You can use this to build more sophisticated task management systems around this executor.

(task/name pet) ; :pet-dog

(def train (task/submit! e :train {:tricks [:down-stay :recall]} nil (fn [_] :training-dog))) (task/data train) ; {:tricks [:down-stay :recall]}

When submitted, tasks can depend on earlier tasks. When it executes, a task receives a vector of the outputs of its dependencies. A task only executes once its dependencies have completed, and will observe their memory effects.

(def dog-promise (promise)) (def dog-task (task/submit! e :make-dog (fn [_] @dog-promise))) (def person-task (task/submit! e :make-person (fn [_] :nona))) (def adopt-task (task/submit! e :adopt [person-task dog-task] (fn [[person dog]] (prn person :adopts dog) :adopted!))) ; Adopt pauses, waiting on dog, which in turn is waiting on our ; dog-promise. (realized? adopt-task) ; false (task/dep-ids adopt-task) ; [5 4]

;Person completed immediately: @person-task ; :nona

; Let's let the dog task run. Once it does, adopt can run too. (deliver dog-promise :noodle) ; Immediately prints :nona :adopts :noodle

; Now we can deref the adoption task. @adopt-task ; :adopted!

Tasks may be cancelled. Cancelling a task also cancels all tasks which depend on it. Unlike normal ThreadPoolExecutors, cancellation is guaranteed to be safe: if a task is still pending, it will never run. Cancelling a task which is running or has already run has no effect, other than removing it from the executor state. This may not be right for all applications; it's important for us.

(task/cancel! e adopt-task)

Tasks either run to completion or are cancelled; they are never interrupted. If they are, who knows what could happen? Almost certainly the executor will stall some tasks forever. Hopefully you're throwing away the executor and moving on with your life.

Unlike standard j.u.c executors, tasks in this system may be created, queried, and cancelled transactionally. The executor's state (State) is a persistent, immutable structure. You perform a transaction with (txn! executor (fn [state] ...)), which returns a new state. Any transformations you apply to the state take place atomically. Note that these functions use pure functions without !: submit instead of submit!, etc.

; Create a task which blocks... (def dog-promise (promise)) (def dog-task (task/submit! e :find-dog (fn [_] @dog-promise))) ; And one that depends on it (def adopt-task (task/submit! e :adopt-dog [dog-task] (fn [dog] (println :adopting dog) [:adopted dog])))

; adopt-task hasn't run because dog-task is still pending. (task/txn! e (fn [state] ; Make sure the adoption hasn't happened yet (if-not (task/pending? state adopt-task) state ; If it hasn't happened yet, cancel the dog task. Adoption will be ; cancelled automatically. (let [state' (task/cancel state dog-task) ; And do something else [state' new-task] (task/submit state' :enter-tomb (fn [_] (prn :oh-no) :tomb-entered))] state')))) ; prints :oh-no (deliver dog-promise :noodle) ; Adoption never happens!

A transactional, dependency-graph oriented task scheduler. Provides a
stateful executor for CPU-bound tasks backed by a num-cores
ThreadPoolExecutor, and allows you to submit tasks to be run on that
executor.

  (require '[jepsen.history.task :as task])
  (def e (task/executor))

At a very high level, a task is a named function of optional dependencies
(inputs) which returns an output. Here's a task with no dependencies:

  (def pet (task/submit! e :pet-dog (fn [_] :petting-dog)))

Tasks are derefable with the standard blocking and nonblocking calls.
Derefing a task returns its output. You can ask completion with `realized?`

  @pet             ; :petting-dog
  (realized? pet)  ; true

If a task throws, its output is the Throwable it threw. Derefing that
throwable also throws, like Clojure futures. Exceptions propagate to
dependencies: dependencies will never execute, and if derefed, will throw as
well.

  (def doomed (task/submit! e :doomed (fn [_] (assert false))))
  ; All fine, until
  @doomed    ; throws Assert failed: false

Each task is assigned a unique long ID by its executor. Tasks should never be
used across executors; their hashcodes and equality semantics are, for
performance reasons, by ID *alone*.

  (task/id pet)   ; 0

Tasks also have names, which can be any non-nil object, and are used for
debugging & observability. Tasks can also carry an arbitrary data object,
which can be anything you like. You can use this to build more sophisticated
task management systems around this executor.

  (task/name pet)   ; :pet-dog

  (def train (task/submit! e :train {:tricks [:down-stay :recall]} nil
               (fn [_] :training-dog)))
  (task/data train)  ; {:tricks [:down-stay :recall]}

When submitted, tasks can depend on earlier tasks. When it executes, a task
receives a vector of the outputs of its dependencies. A task only executes
once its dependencies have completed, and will observe their memory effects.

  (def dog-promise (promise))
  (def dog-task    (task/submit! e :make-dog    (fn [_] @dog-promise)))
  (def person-task (task/submit! e :make-person (fn [_] :nona)))
  (def adopt-task  (task/submit! e :adopt [person-task dog-task]
                     (fn [[person dog]]
                       (prn person :adopts dog)
                       :adopted!)))
  ; Adopt pauses, waiting on dog, which in turn is waiting on our
  ; dog-promise.
  (realized? adopt-task)    ; false
  (task/dep-ids adopt-task) ; [5 4]

  ;Person completed immediately:
  @person-task   ; :nona

  ; Let's let the dog task run. Once it does, adopt can run too.
  (deliver dog-promise :noodle)
  ; Immediately prints :nona :adopts :noodle

  ; Now we can deref the adoption task.
  @adopt-task    ; :adopted!

Tasks may be cancelled. Cancelling a task also cancels all tasks which depend
on it. Unlike normal ThreadPoolExecutors, cancellation is *guaranteed* to be
safe: if a task is still pending, it will never run. Cancelling a task which
is running or has already run has no effect, other than removing it from the
executor state. This may not be right for all applications; it's important
for us.

  (task/cancel! e adopt-task)

Tasks either run to completion or are cancelled; they are never interrupted.
If they are, who knows what could happen? Almost certainly the executor will
stall some tasks forever. Hopefully you're throwing away the executor and
moving on with your life.

Unlike standard j.u.c executors, tasks in this system may be created,
queried, and cancelled *transactionally*. The executor's state (`State`) is a
persistent, immutable structure. You perform a transaction with `(txn!
executor (fn [state] ...))`, which returns a new state. Any transformations
you apply to the state take place atomically. Note that these functions use
pure functions without `!`: `submit` instead of `submit!`, etc.

  ; Create a task which blocks...
  (def dog-promise (promise))
  (def dog-task (task/submit! e :find-dog (fn [_] @dog-promise)))
  ; And one that depends on it
  (def adopt-task (task/submit! e :adopt-dog [dog-task]
    (fn [dog] (println :adopting dog) [:adopted dog])))

  ; adopt-task hasn't run because dog-task is still pending.
  (task/txn! e (fn [state]
    ; Make sure the adoption hasn't happened yet
    (if-not (task/pending? state adopt-task)
      state
      ; If it hasn't happened yet, cancel the dog task. Adoption will be
      ; cancelled automatically.
      (let [state' (task/cancel state dog-task)
            ; And do something else
            [state' new-task] (task/submit state' :enter-tomb
                                (fn [_]
                                  (prn :oh-no)
                                  :tomb-entered))]
        state'))))
; prints :oh-no
(deliver dog-promise :noodle)
; Adoption never happens!
raw docstring

add-dep-edgeclj

(add-dep-edge dep-graph dep task)

Takes a dependency graph, a dependency task, and a new task. Adds a dependency edge dep -> task iff dep is still pending, returning the new dep graph.

Takes a dependency graph, a dependency task, and a new task. Adds a
dependency edge dep -> task iff dep is still pending, returning the new dep
graph.
sourceraw docstring

add-task-helperclj

(add-task-helper state dep-graph task)

Helper for submit* and catch*. Takes a state, a dep graph, and new task, readies the task if possible, increments next-task-id, and creates effects, returning state.

Helper for submit* and catch*. Takes a state, a dep graph, and new task,
readies the task if possible, increments next-task-id, and creates effects,
returning state.
sourceraw docstring

apply-effects!clj

(apply-effects! executor)
(apply-effects! executor [type task])

Attempts to apply all pending effects from an Executor's state. Side effects may be interleaved in any order. When an effect is applied it is removed from the effect queue. Returns executor. This is generally called automatically as tasks complete; you shouldn't need to do it yourself.

In the two-arity form, performs a specific effect.

Attempts to apply all pending effects from an Executor's state. Side effects
may be interleaved in any order. When an effect is applied it is removed from
the effect queue. Returns executor. This is generally called automatically as
tasks complete; you shouldn't need to do it yourself.

In the two-arity form, performs a specific effect.
sourceraw docstring

ATaskclj

source

cancelclj

(cancel state task)

Takes a state and a task to cancel. Deletes the task, and any tasks which depend on it, from every part of the state.

Takes a state and a task to cancel. Deletes the task, and any tasks which
depend on it, from every part of the state.
sourceraw docstring

cancel!clj

(cancel! executor task)

Cancels a task on an executor. Returns task.

Cancels a task on an executor. Returns task.
sourceraw docstring

capture-throwcljmacro

(capture-throw & body)

Evaluates body in a try, capturing any exception thrown and returning it in a CapturedThrowable box. Helpful for error propagation.

Evaluates body in a try, capturing any exception thrown and returning it
in a CapturedThrowable box. Helpful for error propagation.
sourceraw docstring

captured-throw?clj

(captured-throw? x)

Is something a captured throwable?

Is something a captured throwable?
sourceraw docstring

catchclj

(catch state name dep f)
(catch state name data dep f)

Like catch*, but also returns the created catch task: [state catch-task]. This form is more convenient for transactional use.

Takes a state, a name, optional data, a task to depend on, and a function f. If task throws, calls (f exception); the result is the output of the catch task. Otherwise passes on input unchanged.

Like catch*, but also returns the created catch task: [state catch-task].
This form is more convenient for transactional use.

Takes a state, a name, optional data, a task to depend on, and a function
`f`. If `task` throws, calls `(f exception)`; the result is the output of the
catch task. Otherwise passes on input unchanged.
sourceraw docstring

catch!clj

(catch! executor name dep f)
(catch! executor name data dep f)

Submits a new catch task to an Executor. Takes a name for this task, optional data, a task to depend on, and a function (f exception) which will be called if the dependency throws. Returns a newly created Catch task, whose output is either the output of the dependency, or if the dependency throws, whatever f returns.

Submits a new catch task to an Executor. Takes a name for this task,
optional data, a task to depend on, and a function `(f exception)` which will
be called if the dependency throws. Returns a newly created Catch task, whose
output is either the output of the dependency, or if the dependency throws,
whatever `f` returns.
sourceraw docstring

catch*clj

(catch* state name dep f)
(catch* state name data dep f)

Takes a state, a name, optional data, a task to depend on, and a function f. If task throws, calls (f exception); the result is the output of the catch task. Otherwise passes on input unchanged.

Returns a new state with this catch task added.

Takes a state, a name, optional data, a task to depend on, and a function
`f`. If `task` throws, calls `(f exception)`; the result is the output of the
catch task. Otherwise passes on input unchanged.

Returns a new state with this catch task added.
sourceraw docstring

dataclj

(data task)

Returns the custom data associated with a task.

Returns the custom data associated with a task.
sourceraw docstring

dep-idsclj

(dep-ids task)

Returns a vector of dependency IDs of a task.

Returns a vector of dependency IDs of a task.
sourceraw docstring

depsclj

(deps state task)

Given a State and a Task, returns a vector of Tasks that this task depends on. Tasks are nil where the task is no longer known.

Given a State and a Task, returns a vector of Tasks that this task depends
on. Tasks are nil where the task is no longer known.
sourceraw docstring

executorclj

(executor)

Constructs a new Executor for tasks. Executors are mutable thread-safe objects.

Constructs a new Executor for tasks. Executors are mutable thread-safe
objects.
sourceraw docstring

executor-stateclj

(executor-state e)

Reads the current state of an Executor.

Reads the current state of an Executor.
sourceraw docstring

finishclj

(finish state task)

Takes a state and a task which has been executed, and marks it as completed. This deletes the state from the running set and returns the resulting state. It may also result in new tasks being ready.

Takes a state and a task which has been executed, and marks it as completed.
This deletes the state from the running set and returns the resulting state.
It may also result in new tasks being ready.
sourceraw docstring

finish-task!clj

(finish-task! executor task)

Tells an executor that a task is finished. You probably don't need to call this yourself; tasks call this automatically.

Tells an executor that a task is finished. You probably don't need to call
this yourself; tasks call this automatically.
sourceraw docstring

gcclj

(gc state goal to-delete)

Takes a state, a collection of goal tasks you'd like to achieve, and a set of tasks you might like to cancel. Cancels all tasks not contributing to the goal, returning a new state.

Takes a state, a collection of goal tasks you'd like to achieve, and a set of
tasks you might like to cancel. Cancels all tasks not contributing to the
goal, returning a new state.
sourceraw docstring

get-taskclj

(get-task state task-id)

Fetches a Task from a state by ID. Returns nil if task is not known.

Fetches a Task from a state by ID. Returns nil if task is not known.
sourceraw docstring

has-task?clj

(has-task? state task)

Takes a state and a task. Returns true iff the state knows about this task.

Takes a state and a task. Returns true iff the state knows about this task.
sourceraw docstring

idclj

(id task)

Returns the ID of a task.

Returns the ID of a task.
sourceraw docstring

nameclj

(name task)

Returns the name of a task.

Returns the name of a task.
sourceraw docstring

pending?clj

(pending? state task)

Takes a state and a task. Returns true iff that task is still pending execution, and can be safely cancelled.

Takes a state and a task. Returns true iff that task is still pending
execution, and can be safely cancelled.
sourceraw docstring

pseudotaskclj

(pseudotask id)

One of the weird tricks we use (programmers HATE him!) is to abuse the id-only equality semantics of Task objects for efficient lookups in our internal graphs and sets. This constructs an empty task with the given ID. Note also that Tasks and Catches compare equal!

One of the weird tricks we use (programmers HATE him!) is to abuse the
id-only equality semantics of Task objects for efficient lookups in our
internal graphs and sets. This constructs an empty task with the given ID.
Note also that Tasks and Catches compare equal!
sourceraw docstring

ran?clj

(ran? task)

Returns true iff a task ran already.

Returns true iff a task ran already.
sourceraw docstring

stateclj

(state)

Constructs a new executor state. States have six parts:

  • The next task ID we'll hand out

  • The executor wrapping this state. We need this to close the loop on task side effects.

  • A graph of dependencies between tasks. Iff a -> b, b depends on a's results. Once tasks are completed, they're removed from this graph.

  • A set of ready tasks which have no pending dependencies, but have not yet begun. These tasks are eligible to be executed by the threadpool.

  • A set of running tasks we're currently executing. When the executor begins executing a task, it moves from the ready set to the running set.

  • A list of unapplied side effects. Pure functions of the state add effects to the end of this list. They're applied by a mutable function apply-effects!) later, and popped off the list as this occurs.

Constructs a new executor state. States have six parts:

- The next task ID we'll hand out

- The executor wrapping this state. We need this to close the loop on task
  side effects.

- A graph of dependencies between tasks. Iff a -> b, b depends on a's
  results. Once tasks are completed, they're removed from this graph.

- A set of ready tasks which have no pending dependencies, but have not yet
  begun. These tasks are eligible to be executed by the threadpool.

- A set of running tasks we're currently executing. When the executor begins
  executing a task, it moves from the ready set to the running set.

- A list of unapplied side effects. Pure functions of the state add effects
  to the end of this list. They're applied by a mutable function
  apply-effects!) later, and popped off the list as this occurs.
sourceraw docstring

state-done?clj

(state-done? state)

Returns true when a state has nothing ready, nothing running, and nothing in the dependency graph; e.g. there is nothing more for it to do.

Returns true when a state has nothing ready, nothing running, and nothing in
the dependency graph; e.g. there is nothing more for it to do.
sourceraw docstring

state-queueclj

(state-queue state-atom)

A queue for our ThreadPoolExecutor which wraps an atom of a State. Tasks are pulled off the state's queue and placed into the running set. This lets us get around issues with handing off records from our immutable state (which is basically a queue!) to a standard mutable executor queue (which we can't mutate safely).

We're doing something kind of evil here: to avoid mutating the state twice, we actually:

  1. Put the task into our state directly, using our own transactional fns
  2. Call executor.execute(task)
  3. The executor tries to put that task onto this queue; we return true and do nothing because we know we already have it.
  4. The executor threads poll us, and we hand them tasks from the queue.
A queue for our ThreadPoolExecutor which wraps an atom of a State. Tasks are
pulled off the state's queue and placed into the running set. This lets us
get around issues with handing off records from our immutable state (which is
basically a queue!) to a standard mutable executor queue (which we can't
mutate safely).

We're doing something *kind of* evil here: to avoid mutating the state
twice, we actually:

1. Put the task into our state directly, using our own transactional fns
2. Call executor.execute(task)
3. The executor tries to put that task onto this queue; we return true and do
   nothing because we *know* we already have it.
4. The executor threads poll us, and we hand them tasks from the queue.
sourceraw docstring

state-queue-claim-task*!clj

(state-queue-claim-task*! state vol)

A support function for StateQueue.run. Definitely don't call this yourself. I mean, I call it, but I've come to terms with my sins.

Takes a state and a volatile. Tries to move a task from the ready set to the running set, and if that works, sets the volatile to that task. Otherwise sets the volatile to nil.

A support function for StateQueue.run. Definitely don't call this yourself.
I mean, *I* call it, but I've come to terms with my sins.

Takes a state and a volatile. Tries to move a task from the ready set to the
running set, and if that works, sets the volatile to that task. Otherwise
sets the volatile to nil.
sourceraw docstring

submitclj

(submit state name f)
(submit state name deps f)
(submit state name data deps f)

Like submit*, but also returns the created task: [state new-task]. This form is more convenient for transactional task creation.

Given a state, constructs a fresh task with the given name, optional data, optional list of Tasks as dependencies, and a function (f dep-results) which takes a vector of dependency results. Returns the state with the new task integrated, including a [:new-task task], which you can use to read the created task.

Dependencies must either be in the graph already, or realized, to prevent deadlock.

Like submit*, but also returns the created task: [state new-task]. This form
is more convenient for transactional task creation.

Given a state, constructs a fresh task with the given name, optional data,
optional list of Tasks as dependencies, and a function (f dep-results) which
takes a vector of dependency results. Returns the state with the new task
integrated, including a [:new-task task], which you can use to read the
created task.

Dependencies must either be in the graph already, or realized, to prevent
deadlock.
sourceraw docstring

submit!clj

(submit! executor name f)
(submit! executor name deps f)
(submit! executor name data deps f)

Submits a new task to an Executor. Takes a task name, optional data, an optional collection of Tasks as dependencies, and a function (f dep-results) which receives a map of dependency IDs to their results. Returns a newly created Task object.

Submits a new task to an Executor. Takes a task name, optional data, an
optional collection of Tasks as dependencies, and a function `(f
dep-results)` which receives a map of dependency IDs to their results.
Returns a newly created Task object.
sourceraw docstring

submit*clj

(submit* state name f)
(submit* state name deps f)
(submit* state name data deps f)

Given a state, constructs a fresh task with the given name, optional data, optional list of Tasks as dependencies, a function (f dep-results) which takes a vector of dependency results. Returns the state with the new task integrated, including a [:new-task task], which you can use to read the created task.

Dependencies must either be in the graph already, or realized, to prevent deadlock.

If the task has no dependencies, it may be ready immediately; the :new-task effect will be followed by a [:ready-task task] effect. In either case, the final effect will contain the newly created task.

Given a state, constructs a fresh task with the given name, optional data,
optional list of Tasks as dependencies, a function (f dep-results) which
takes a vector of dependency results. Returns the state with the new task
integrated, including a [:new-task task], which you can use to read the
created task.

Dependencies must either be in the graph already, or realized, to prevent
deadlock.

If the task has no dependencies, it may be ready immediately; the :new-task
effect will be followed by a [:ready-task task] effect. In either case, the
final effect *will* contain the newly created task.
sourceraw docstring

throw-capturedclj

(throw-captured x)

Rethrows a captured throwable.

Rethrows a captured throwable.
sourceraw docstring

txn!clj

(txn! executor f)

Executes a transaction on an executor. Takes an executor and a function which transforms that executor's State. Applies that function to the executor's state, then applies any pending side effects. Returns the state the function returned.

Locks executor queue, since txns may be ~expensive~ and the threadpool is constantly mutating it.

Executes a transaction on an executor. Takes an executor and a function
which transforms that executor's State. Applies that function to the
executor's state, then applies any pending side effects. Returns the state
the function returned.

Locks executor queue, since txns may be ~expensive~ and the threadpool is
constantly mutating it.
sourceraw docstring

void-runnableclj

A dummy runnable which only exists to wake up a ThreadPoolExecutor. Might also be handy if you want to create tasks that do nothing, e.g. for dependency graph sequencing tricks.

A dummy runnable which only exists to wake up a ThreadPoolExecutor. Might
also be handy if you want to create tasks that do nothing, e.g. for
dependency graph sequencing tricks.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close