The primitive machinate provides is Events. The fundamental operation on an event is synchronizing on it. Synchronizing on an event means waiting for that event to occur. When an event occurs a value is communicated to anyone synchronizing on the event. Events can be combined into a new event that represents an unordered choice of those events (select, alts, etc). Events can be guarded to add pre-synchronization actions, and wrapped to provide post-synschronization actions. There is a nack mechanism that allows for registering actions to take if an event is not the one selected when doing the unordered choice thing.
Machinate also provides channels(queues) for communicating values between threads of control. Sending and receiving data over channels is done by constructiong events that represent sending and receiving and then sync'ing on those events.
The following examples all assume:
(require '[com.manigfeald.machinate :as m])
m/sync!
does not block, but takes a callback to be run after an
event occurs. m/sync!!
blocks until the given event occurs and
returns and returns the value of the event.
nil
is the never event. It never synchronizes, so if you try you
will just wait forever. com.manigfeald.machinate/always
is an event
that always has already happend. Syncrhonizing on always succeeds
immiediately.
(m/sync!! nil) ; blocks forever
(m/sync!! m/always) ; never blocks
m/watch-reference
creates an event that occurs when some clojure
reference type (atom, ref, agent, anything add-watch works on really)
is observed to have some given value.
(def a (atom 0))
(def e (m/watch-reference a 1))
(m/sync! (m/wrap e (fn [_] (println "yo")))) ; "yo" doesn't print
(swap! a inc) ; e occurs and "yo" prints
m/wrap
and m/wrap-handler
take an event and create a new one that
will apply the given function to the original events value and then
use the result as the value of the new event.
m/wrap
is for normal values, m/wrap-handler
is for handling
errors.
(m/sync!! (m/wrap m/always (constantly :hello))) ; results in :hello
com.manigfeald.machinate/timeout
returns an event that occurs
sometime in the future after a given delay.
(m/sync!! (m/timeout 100)) ; blocks for 100 milliseconds
A rendevous channel is unbufffered. Senders and receivers must stop and wait for each other.
(def a-channel (m/channel))
(m/sync!! (m/send a-channel :some-message))
(m/sync!! (m/receive a-channel))
(m/sync!! (m/choice [(m/receive a-channel) (m/timeout 1000)]))
(m/close! a-channel)
Publishes 1s to channel c
until it is closed.
(def c (m/channel))
((fn this-fn [] (sync! (wrap (send c 1) (fn [was-sent] (when was-sent (this-fn)))))))
This example sends 1000 messages and then receivers them all.
(let [n 1000
cs (repeatedly n m/channel)
begin (System/currentTimeMillis)]
(doseq [c cs] (m/sync! (m/send c "hi")))
(loop [receives (for [c cs] (m/receive c))]
(when (seq m/receives)
(let [[the-val the-evt] (m/sync!! (m/alts! receives))]
(assert (= "hi" the-val))
(recur (remove #{the-evt} receives)))))
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
(let [philosopher-count 10
hunger 100
eating-seconds (/ 1 100)
done (atom 0)
left-forks (vec (repeatedly philosopher-count #(m/buffered-channel 1)))
announcements (m/channel)
log (fn [& args]
(m/sync!! (m/send announcements args)))
yield-virtual-thread (fn []
(m/sync!! (m/timeout 0)))]
((fn l []
(m/sync!
(m/wrap (m/receive announcements)
(fn [args]
(apply println args)
(l))))))
(dotimes [i philosopher-count]
(let [left-fork (nth left-forks i)
right-fork (nth left-forks (mod (inc i) philosopher-count))]
(Thread/startVirtualThread
(fn []
(try
(log i 'seated)
(loop [h hunger
is-thinking false]
(yield-virtual-thread)
(when-not (neg? h)
(if (m/sync!! (m/ordered-choice [(m/receive left-fork)
(m/wrap m/always
(constantly false))]))
(if (m/sync!! (m/ordered-choice [(m/receive right-fork)
(m/wrap m/always
(constantly false))]))
(do
(log i 'eating h)
(m/sync!! (m/timeout (* (rand) 1000 eating-seconds)))
(m/sync!! (m/send left-fork true))
(m/sync!! (m/send right-fork true))
(recur (dec h) true))
(do
(m/sync!! (m/send left-fork true))
(recur h is-thinking)))
(do
(when-not is-thinking
(log i 'thinking))
(recur h true)))))
(log i 'satisfied)
(swap! done inc)
(log i 'left)
(catch Throwable t
(prn t)))))))
(doseq [fork left-forks] (m/sync! (m/send fork true)))
(m/sync!! (m/watch-reference done philosopher-count))
(log 'table-empty))
Can you improve this documentation?Edit on sourcehut
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close