Liking cljdoc? Tell your friends :D

Bulkhead (concurrency limiter)

The bulkhead pattern prevents faults in one part of the system from taking the entire system limiting the number of concurrent calls.

This SO answer explains the concept very well.

Getting Started

There are two different implementations: :sync and :async.

  • :async (or :executor) - runs the task in an executor.
  • :sync (or :semaphore) - runs the task in the current thread.

Independently of the implementation, the bulkhead works in this whay:

  • Checks if concurrency limit is not reached, if not, proceed to execute the function in the underlying executor (on in the same thread).
  • If concurrency limit is reached, it queues the execution until other tasks are finished (using queue in async or just blocking the current thrad in case of sync).
  • If queue limit is reached, return a rejection.

So let's start with some examples:

(require '[promesa.exec.bulkhead :as pxb]
         '[promesa.exec :as px])

;; All parameters are optional and have defaults
(def instance (pxb/create :type :async :permits 1 :queue 16))

@(px/submit instance
            (fn []
              (Thread/sleep 1000)
              1))
;; => 1

At first glance, this seems like an executor instance because it resembles the same API (aka px/submit call, with the particularity that the sync or semaphore based bulkhead implementation executes the task in the current thread, blocking the call on the submit).

Additionally to the executor interface, it also implemens the new IInvokable protocol, so you can execute a synchronous call (independently of the final bulkhead implementation) thanks to the px/invoke helper. Internally, on async bulkhead is just a combination of px/submit with px/join, but on the sync bulkhead implementation it does not uses the CompletableFuture machinary and just executes the function almost with no indirections.

NOTE: The :sync bulkhead works fine with JVM virtual threads.

Available options

The create function accept the following parameters and its defaults:

  • :type: can be :sync or :async
  • :executor: only when async type is used, allows provide custome executor, if not provided the default one is used
  • :permits: the max permits (concurrent jobs) allowed (defaults to 1)
  • :queue: the max queued jobs allowed (defaults to Integer/MAX_VALUE), when maximum queue is reached, the task submision will be rejected
  • :timeout: maximum time to wait synchronously to enqueue the task (on async it measn time to put the task to the internal queue and on sync means tiem to wait to aquire the semaphore, if not specified and the max-queue is reached, an exception will be raised that queue is full).

For backward compatibility we still preserve the old types:

  • :executor alias for :async
  • :semaphore alias for :sync

Can you improve this documentation? These fine people already did:
Andrey Antukh & Stephen Hopper
Edit on GitHub

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close