A lightweight, scalable messaging system.
We run some number of nodes running our server, which has a simple "junction" concept within it: a central bus where you can send or receive messages to a topic ID. Messages are transient; they are either sent to a listener, or they are discarded. Sending messages uses a timeout to "block" sending the message until a consumer appears.
Clients use a cluster-aware junction client that uses a service discovery system (such as consul) to discover available nodes. Clients query each node for their set of "tokens": a set of 64-bit integers randomly chosen by the server. When sending a message, the client hashes the topic ID to a 64-bit integer, and selects the node that has the biggest token that is less than the topic hash.
Servers run on TCP port 3443 by default. The protocol is MessagePack encoded arrays of "method calls", framed by a 16-bit integer length.
Each method call is a simple array of values:
Method calls except ":tokens" will also include more arguments:
:send!
calls only, the value being sent. This can be any value that can be encoded in MessagePack.Responses are likewise sent back to the client as arrays of values, laid out as follows:
true
, or the timeout value, for ":send!" calls (so a send will either succeed or time out).The timeout value is encoded as a msgpack extension, with type 0x54 (the character T
).
The value of the extension is a single zero byte.
The method call:
[":recv!", 1, 1000, "foo"]
Would be encoded as bytes (in hexadecimal, including the length field prefix):
001094a63a726563762101cd03e8a3666f6f
The send method call:
[":send!", 2, 1000, "foo", "bar"]
Would be encoded as bytes:
001495a63a73656e642102cd03e8a3666f6fa3626172
The response:
[":recv!", 1, "bar"]
Would be encoded as:
000d93a63a726563762101a3626172
And the response:
[":recv!", 1, timeout-value]
Would be encoded as:
000c93a63a726563762101d45400
The docker compose file can be used to spin up an example cluster of 2 skywalker nodes and 5 consul nodes:
docker compose up -d
You can then start an nREPL server within the same network:
docker run --network skywalker_default -p 7888:7888 rsdio/skywalker-nrepl
Connect to local port 7888 to run a clojure repl. Then you can run:
(require '[clojure.core.async :as async])
(require '[skywalker.client :as client])
(require '[skywalker.core :as s])
(def client1 (async/<!! (client/remote-junction (java.net.InetSocketAddress. "skywalker1" 3443) {})))
(def client2 (async/<!! (client/remote-junction (java.net.InetSocketAddress. "skywalker2" 3443) {})))
(let [recv (s/recv! client1 "foo" {})
send (s/send! client2 "foo" "bar" {})]
(async/<!! (async/into [] (async/merge [recv send]))))
; should print ["bar" true]
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close