Liking cljdoc? Tell your friends :D

Introduction to machinate

The primitive machinate provides is Events. The fundamental operation on an event is synchronizing on it. Synchronizing on an event means waiting for that event to occur. When an event occurs a value is communicated to anyone synchronizing on the event. Events can be combined into a new event that represents an unordered choice of those events (select, alts, etc). Events can be guarded to add pre-synchronization actions, and wrapped to provide post-synschronization actions. There is a nack mechanism that allows for registering actions to take if an event is not the one selected when doing the unordered choice thing.

Machinate also provides channels(queues) for communicating values between threads of control. Sending and receiving data over channels is done by constructiong events that represent sending and receiving and then sync'ing on those events.

The following examples all assume:

(require '[com.manigfeald.machinate :as m])

Sync

m/sync! does not block, but takes a callback to be run after an event occurs. m/sync!! blocks until the given event occurs and returns and returns the value of the event.

Never and always

nil is the never event. It never synchronizes, so if you try you will just wait forever. com.manigfeald.machinate/always is an event that always has already happend. Syncrhonizing on always succeeds immiediately.

(m/sync!! nil) ; blocks forever

(m/sync!! m/always) ; never blocks

Clojure reference types

m/watch-reference creates an event that occurs when some clojure reference type (atom, ref, agent, anything add-watch works on really) is observed to have some given value.

(def a (atom 0))

(def e (m/watch-reference a 1))

(m/sync! (m/wrap e (fn [_] (println "yo")))) ; "yo" doesn't print

(swap! a inc) ; e occurs and "yo" prints
 

Wrapping

m/wrap and m/wrap-handler take an event and create a new one that will apply the given function to the original events value and then use the result as the value of the new event.

m/wrap is for normal values, m/wrap-handler is for handling errors.

(m/sync!! (m/wrap m/always (constantly :hello))) ; results in :hello

Timeouts

com.manigfeald.machinate/timeout returns an event that occurs sometime in the future after a given delay.

(m/sync!! (m/timeout 100)) ; blocks for 100 milliseconds

Create a rendevous channel

A rendevous channel is unbufffered. Senders and receivers must stop and wait for each other.

(def a-channel (m/channel))

Send over a channel

(m/sync!! (m/send a-channel :some-message))

Receive over a channel

(m/sync!! (m/receive a-channel))

Receive from a channel or timeout

(m/sync!! (m/choice [(m/receive a-channel) (m/timeout 1000)]))

Close a channel

(m/close! a-channel)

A channel of 1s

Publishes 1s to channel c until it is closed.

(def c (m/channel))

((fn this-fn [] (sync! (wrap (send c 1) (fn [was-sent] (when was-sent (this-fn)))))))

A port of a core.async example

This example sends 1000 messages and then receivers them all.

(let [n 1000
      cs (repeatedly n m/channel)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (m/sync! (m/send c "hi")))
  (loop [receives (for [c cs] (m/receive c))]
    (when (seq m/receives)
      (let [[the-val the-evt] (m/sync!! (m/alts! receives))]
        (assert (= "hi" the-val))
        (recur (remove #{the-evt} receives)))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

Dining philosophers

(let [philosopher-count    10
      hunger               100
      eating-seconds       (/ 1 100)
      done                 (atom 0)
      left-forks           (vec (repeatedly philosopher-count #(m/buffered-channel 1)))
      announcements        (m/channel)
      log                  (fn [& args]
                             (m/sync!! (m/send announcements args)))
      yield-virtual-thread (fn []
                             (m/sync!! (m/timeout 0)))]
  ((fn l []
     (m/sync!
      (m/wrap (m/receive announcements)
              (fn [args]
                (apply println args)
                (l))))))
  (dotimes [i philosopher-count]
    (let [left-fork  (nth left-forks i)
          right-fork (nth left-forks (mod (inc i) philosopher-count))]
      (Thread/startVirtualThread
       (fn []
         (try
           (log i 'seated)
           (loop [h           hunger
                  is-thinking false]
             (yield-virtual-thread)
             (when-not (neg? h)
               (if (m/sync!! (m/ordered-choice [(m/receive left-fork)
                                                (m/wrap m/always
                                                        (constantly false))]))
                 (if (m/sync!! (m/ordered-choice [(m/receive right-fork)
                                                  (m/wrap m/always
                                                          (constantly false))]))
                   (do
                     (log i 'eating h)
                     (m/sync!! (m/timeout (* (rand) 1000 eating-seconds)))
                     (m/sync!! (m/send left-fork true))
                     (m/sync!! (m/send right-fork true))
                     (recur (dec h) true))
                   (do
                     (m/sync!! (m/send left-fork true))
                     (recur h is-thinking)))
                 (do
                   (when-not is-thinking
                     (log i 'thinking))
                   (recur h true)))))
           (log i 'satisfied)
           (swap! done inc)
           (log i 'left)
           (catch Throwable t
             (prn t)))))))
  (doseq [fork left-forks] (m/sync! (m/send fork true)))
  (m/sync!! (m/watch-reference done philosopher-count))
  (log 'table-empty))

Can you improve this documentation?Edit on sourcehut

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close