(ns my-ns
(:require [#?(:clj com.wsscode.async.async-clj
:cljs com.wsscode.async.async-cljs)
:as wa
:refer [go-promise <?]]))
Core.async
is the standard way to handle async features in Clojure and ClojureScript programs.
Although core.async is built upon CSP, often (specially in CLJS) is desired to have something that’s more like Promises/Futures.
Core.async provides a promise-chan
, which is a channel that has a promise-like semantic:
after realization, any read on it will keep returning the same value. That’s helpful but
this doesn’t cover the problem of error propagation, in a promise system it’s expected
that errors can flow up to be captured by some higher level code.
This library provide some helpers to deal with this problem, and also:
helpers to integrate with Javascript Promises in ClojureScript environments.
helpers to write tests using core.async and cljs.test.
serialized event callback handling
ID
based async callbacks
This library uses separated namespaces for Clojure and ClojureScript, when using the
Clojure side, use the namespace com.wsscode.async.async-clj
, and for ClojureScript
use com.wsscode.async.async-cljs
. Unless pointed otherwise, the features on both
namespaces are the same.
If you want to use this with Clojure Common files, you can use the require like this:
(ns my-ns
(:require [#?(:clj com.wsscode.async.async-clj
:cljs com.wsscode.async.async-cljs)
:as wa
:refer [go-promise <?]]))
To deal with error propagation, the trick is to return the error object as the channel value, but also throw that error when reading the channel. Let’s illustrate that with an example:
(ns my-ns
(:require [com.wsscode.async.async-clj :refer [go-promise <?]))
(defn async-divide [x y]
; (1)
(go-promise
(/ x y)))
(defn run [d]
(go-promise
(try
; (2)
(<? (async-divide 6 d))
(catch Throwable _
"ERROR"))))
(comment
(<!! (run 2)) ; => 3 (3)
(<!! (run 0)) ; => "ERROR" (4)
)
1 | go-promise is similar to go , but will return a promise-channel, so in case this result gets
passed to multiple readers, all of them will get the realized value or error. This also
wraps the block in a try/catch, so if some exception happens it will get returned as the channel value. |
2 | <? works like <! , but once it reads a value, it will check if it’s an error, and
if so it will throw that error, propagating it up on the chain. |
3 | propagate value back |
4 | error propagated from async-divide trying to divide by zero |
By following this pattern of error capture and send, this system ends up working in the
same ways you would expect a promise, all while still in the same go
blocks, making
it compatible with standard core.async functionality. Later we will also talk about how
to integrate with Javascript Promises in this system.
Another difference when using go-promise
is that different from regular go
, you can
return nil
as a value. Since the promise will always return the same value, a nil
is not ambiguous.
In the implementation side, go-promise check if the value returned is nil , and
if so it just closes the channel, making it an effective nil value.
|
We just saw the helper <?
to do a take and check for errors, here is a list of other
take helpers provided by this library:
<?
- take and throw errors
<?!
(clj only) - take and throw errors blocking on thread
<!p
(cljs only) - take from JS Promise
<!maybe
- if param is a channel, take from it, otherwise return its value
<!!maybe
(clj only) - like <!maybe
but blocking the thread
<?maybe
- take and throw errors, return value if it’s not a channel
<?!maybe
(clj only) - like <?maybe
but blocking the thread
in ClojureScript <?maybe can also take from Promises
|
While working in Javascript it’s common to need to handle Promises, to help with this
there is a macro in this library that enables the read of JS promises as if they
were core.async
channels, the <!p
helper:
(ns my-ns
(:require [com.wsscode.async.async-cljs :refer [go-promise <? <!p]))
; (1)
(go-promise
(-> (js/fetch "/") <!p
(.text) <!p
js/console.log))
1 | Read the index text of the current domain, note we are waiting for two different promises in this example, the first one for the fetch headers and the second to get the body text. |
the way <!p works is by first converting the Promise into a core.async channel
and them read on that channel, for core.async sake it’s channels all the way.
|
Note that this strategy allows the mixing of both core.async
channels and promises
in the same system, you can both park for channels or promises.
Dealing with async tests in cljs.test can be annoying, the core doesn’t have any integration
with core.async, neither it handles common problems like timing out a test. This library
provides a helper called deftest-async
that aims to facilitate the tests of async core
using core.async. Example usage:
(ns com.wsscode.async.async-cljs-test
(:require [clojure.test :refer [is are run-tests async testing deftest]]
[com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))
(deftest-async my-test
(is (= "foo" (<! (go "foo")))))
This macro will do a couple of things:
It will wrap the body in a go-promise
block, allowing the use of parking operations
Try/catch this block, if any error happens (sync or async) that generates a test case that will fail with that error
Add a 2 seconds timeout, if the go
block doesn’t return in this time it will cancel and fail the test
You can configure the timeout duration, example:
(ns com.wsscode.async.async-cljs-test
(:require [clojure.test :refer [is are run-tests async testing deftest]]
[com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))
(deftest-async my-test
{::wa/timeout 5000} ; 5 seconds timeout
(is (= "foo" (<! (go "foo")))))
if you want to use this helper with a different test constructor (from Workspaces
or Devcards for example) you can use the wa/async-test helper instead
|
This library provides a helper to serialize async event callbacks. By default, if you do event handling like this:
(.on some-object "event"
(fn handler-fn [e]
(go
(-> (do-operation e) <!
(do-more) <!))))
In case many events come rapidly, the callbacks will run in between each other, a lot of times that’s not a problem, but if you need sequencing then this may get you in trouble.
To handle this you can use the event-queue!
helper:
(.on some-object "event"
(wap/event-queue!
(fn handler-fn [e]
(go
(-> (do-operation e) <!
(do-more) <!)))))
INFO: wap
is alias for com.wsscode.async.processing
The event-queue!
returns a new callback function that instead of calling handler
directly,
it will add the event to a queue for processing, in case the handler returns a channel,
that channel will be awaited before processing the next event, this a very easy and
quick way to ensure serialisation.
By default the queue will use one (async/chan (async/dropping-buffer 1024))
. You can
override it with:
(.on some-object "event"
(wap/event-queue! {::wap/channel (async/chan (async/sliding-buffer 1024))}
(fn handler-fn [e]
(go
(-> (do-operation e) <!
(do-more) <!)))))
One way to stop the processing from running is to send a custom channel, and when you want to stop processing you close it. Example:
(let [ch (async/chan (async/sliding-buffer 1024))]
(.on some-object "event"
(wap/event-queue! {::wap/channel ch}
(fn handler-fn [e]
(go
(-> (do-operation e) <!
(do-more) <!)))))
; later in the future
(async/close! ch))
If you use something like Websockets for communication, depending on the library you are using they may or may not include some way to handle callback events. To handle this (or any other case were message callbacks are not a native option) this library provides some helpers.
The idea is to send a message providing some ID
, and then wait for a response message
to come, the response will include the same ID
from the request, so they match.
This process happens in three main steps:
Once we send a message requiring a callback, create something to get notified once the response arrives
If you read a message that wants a response, create and send the response message
Listen to message responses on the event entry point
This mechanism assumes your message are maps.
To implement 1
, you create a function that wraps whatever your transmit function is:
(defn send-message! [msg]
(original-send-message! msg)
; this will check if the message has a request-id, and if so will create a channel
; that will have data available once the message is replied
(wap/await! msg))
Then, wrap your read side with the capture-response!
helper:
(defn handle-message [msg]
; this will fire the handler when the message contains ::wap/response-id, otherwise
; it lets the message flow
(if-not (wap/capture-response! msg)
(original-handle-msg msg)))
In your handle, to reply a message, to this:
(defn some-handler [msg]
(send-message! (wap/reply-message msg) "reply value"))
And finally, to issue a request and wait for the callback:
(go
(let [res (<? (send-message! (assoc msg ::wap/request-id (wap/random-request-id))))]
(print "Response: " res)))
INFO: The await!
helper has a built-in timeout mechanism, the default wait time is 5s.
There are other minor helpers not mentioned in this document, but they all have documentation on the functions, to check it out see the cljdoc page of this library.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close