A small wrapper on top of virtual threads introduced in Java 21.
The recent release of Java 21 introduced virtual threads to the scene. It's a nice feature that allows you to run imperative code, such as it was written in an asynchronous way. This library is a naive attempt to gain something from the virtual threads.
Lein
[com.github.igrishaev/virtuoso "0.1.1"]
Deps/CLI
{com.github.igrishaev/virtuoso {:mvn/version "0.1.1"}}
TL;DR: why making the new API? Because unlike v1 and v2, the third version was heavily tested in production. It has spotted some weaknesses in v2 but I don't want to introduce breaking changes. Thus, it's safer to ship a new module (done right this time, I hope).
The virtuoso.v3 namespace provides a number of functions and macros. Most of
them mimic their counterparts from the clojure.core namespace, but are
enforced by virtual threads.
(ns demo
(:require
[virtuoso.v3 :as v]))
The thread macro runs a block if code in a virtual thread. You'll get an
instance of java.lang.VirtualThread. The thread is started immediately. You
can .join it if you want.
(def -t (v/thread
(Thread/sleep 1000)
(+ 1 2)))
(.join -t)
nil
Usually, you cannot return a value from a thread (in a normal way). Use this macro only when you're not interested in a result.
The future macro runs a virtual thread similar to the thread macro. The
diffenrece is, you'll get an instance of CompletableFuture which gets
completed either normally or exceptionally should an exception pops up. The
result can be processed with deref or any future-... function:
(def -f (v/future
(Thread/sleep 1000)
(+ 1 2)))
(future? -f) ;; true
(future-done? -f) ;; true
@-f ;; 3
(def -failed
(v/future (/ 0 0)))
(deref -failed)
;; Execution error (ArithmeticException) at ...
;; Divide by zero
There are two macros called with-executor and future-via working in
pair. The first macro temporary opens a new virtual ExecutorService and binds
it to a certain variable. Pass this executor into the future-via macro so the
task is bound to this specific executor. The with-executor macro closes the
executor when exiting which guarantees all pending tasks are completed (normally
or exceptionally).
(v/with-executor [exe]
(let [a (v/future-via [exe]
(Thread/sleep 1000)
(+ 1 2))
b (v/future-via [exe]
(Thread/sleep 1000)
(+ 4 5))]
(+ @a @b)))
;; 12
The map function acts like clojure.core/map does but:
deref-ed items processed one by one.(def -result
(v/map (fn [a b]
(Thread/sleep 100)
(+ a b))
(range 10)
(range 10)))
(0 2 4 6 8 10 12 14 16 18)
Pay attention: if you perform HTTP calls or file IO for each item, apparently
you might hit the global 1024 ulimit constraint. Java will throw an exception
saying "too many open connections" or something. For such cases, it's better to
use the pmap function that acts through chunks (see below).
The pmap function is similar to clojure.core/pmap as it splits incoming
data on chunks. Each chunk of items is served within a dedicated virtual
executor. The next chunk won't start until the current one is complete. The
leading n parameter determines the chunk size. While working with HTTP calls,
that's ok to pass 512 or something similar (unless you have a custom ulimit
alue set). The function returns a lazy sequence of deref-ed values.
(def -result
(v/pmap 512
(fn [a b]
(Thread/sleep 1000)
(+ a b))
(range 1000)
(range 1000)))
(count -result) ;; takes ~2 seconds
1000
Better example: download 50k files from S3 by chunks of 1000:
(def -result
(v/pmap 1000
(fn [url]
(-> url
(client/get {:as :stream})
(:body)
(process-input-stream)))
(get-urls-to-fetch...)))
The fmap function is a low-level function which pmap is based on. It
returns a chunked sequence of futures. It's up to you how to handle them:
(def -futs
(v/fmap 512
(fn [a b]
(Thread/sleep 100)
(+ a b))
(range 1000)
(range 1000)))
(take 5 -futs)
(#object[ThreadBoundFuture ...[Completed normally]"]
#object[ThreadBoundFuture ...[Completed normally]"]
#object[ThreadBoundFuture ...[Completed normally]"]
#object[ThreadBoundFuture ...[Completed normally]"]
#object[ThreadBoundFuture ...[Completed normally]"])
The pvalues macro acts like clojure.core/pvalues forms are executed
within a virtual executor which gets closed afterwards. The result a lazy
sequence which iterating, derefs futures.
(v/pvalues
(+ 1 2)
(let [a 3 b 4]
(Thread/sleep 100)
(+ a b))
(* 5 6))
;; (3 7 30)
The for macro mimics the standard clojure.core/for but each body is run
in a virtual future. These futures are global meaning they are not bound to a
dedicated virtual executor. The result is a sequence of deref-ed values:
(v/for [a [1 2 3]
b [:a :b :c]
:when (not= [a b] [2 :b])
:let [c (* a a)]]
{:c c :b b})
({:c 1, :b :a}
{:c 1, :b :b}
{:c 1, :b :c}
{:c 4, :b :a}
{:c 4, :b :c}
{:c 9, :b :a}
{:c 9, :b :b}
{:c 9, :b :c})
Moved to a legacy V2 doc file.
Moved to a legacy V1 doc file.
There is a development dev/src/bench.clj module with some benchmarks. Imagine
we want to download 100 large files using map, pmap and virtual
threads. Before we do this, let's mimic real environment as follows:
server {
listen 8080;
server_name localhost;
...
location /hugefile.bin {
root html;
limit_rate 500k;
}
}
Now when you curl that file, it will be v-v-very slow.
The idea behind this trick is to mimic real IO expectation. Without limiting
throughput, the standard map outperforms both pmap and virtual threads just
because networking is too fast.
Now that the file is served in a slow manner, prepare a function that downloads it into nowhere:
(def URL "http://127.0.0.1:8080/hugefile.bin")
(defn download [i]
(with-open [in ^java.io.InputStream
(:body (client/get URL {:as :stream}))
out
(java.io.OutputStream/nullOutputStream)]
(.transferTo in out)))
Let's download it 100 times in different ways:
(time
(count
(map download SEQ)))
;; Elapsed time: 1102802.057709 msecs
(time
(count
(pmap download SEQ)))
;; Elapsed time: 44213.30375 msecs
(time
(count
(v3/map download SEQ)))
;; Elapsed time: 11124.417959 msecs
(time
(count
(v3/pmap 512 download SEQ)))
;; Elapsed time: 11090.514792 msecs
The standard map function lasts forever because it downloads files one by
one. If the file size is 6 megabyes and the rate limit is 500 kbs, it will take
12 secods to fetch it. Therefore, downloading 100 files takes 12 sec * 100 =
1200 seconds = 20 minutes.
The pmap function behaves better of course as it parallels jobs. My laptop has
got 12 CPUs meaning that, theoretically, it can download 14 files simultaneously
(pmap window size = CPU + 2). Above, downloading 100 files takes 44 seconds.
Now, the two map functions powered with virtual threads. One soon as one
virtual thread emits a blocking IO call, its stack trace replaced with a stack
trace of another thread that has just woken up from blocking IO. In our case,
all 100 files get downloaded in parallel, and the final time is 12 seconds. It
took as longs as to download a single file -- but we got 100 files.
A quick example of breaking ulimit constrain. 100 (files) is less than 1024
(default ulimit) so we're not breaking it. Now imagine we'd like to download
2000 files using virtual thread. This is what will happen:
(time
(count
(v3/map download (range 2000))))
INFO: I/O exception (java.net.SocketException) caught
when processing request to {}->http://127.0.0.1:8080: Connection reset
... org.apache.http.impl.execchain.RetryExec execute
INFO: Retrying request to {}->http://127.0.0.1:8080
Other HTTP clients may fail with "too many open connections" error. The right
approach would be to use v/pmap with a window size:
(time
(count
(v3/pmap 512 download (range 2000))))
;; Elapsed time: 44684.907583 msecs
2000 files in 45 seconds! It means, every second were downloading about 44 files.
The following links helped me a lot to dive into virtual threads, and I highly recommend reading and watching them:
©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©
Ivan Grishaev, 2023. © UNLICENSE ©
©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©©
Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |