Liking cljdoc? Tell your friends :D

MQTT broker for Clojure

Rationale

Unfortunately there are few to nothing libraries natively suitable to be used in Clojure development that implement MQTT server side functionalities.

The Moquette library is Java MQTT broker based on an eventing model with Netty. The library has a good performance and is desegned with embedding support out of the box. Its configuration is compatible with well known Mosquitto Open Source MQTT server.

When being combined - these tools open the way to painless M2M communications for services written in Clojure.

As always we prefer to have things tiny and as simple as possible.

Compatibility

The following implementations are successfully passed compatibility tests:

  • Moquette

  • Mosquitto

  • Paho

  • MQTT.js

  • DKD/Brownie

any other tool that conform MQTT specs should pass too, we believe.

Usage

The usage pattern is simple:

  • define handlers

  • create broker instance

  • start the service

  • start serve messages

Defining handlers

The handlers record should implement InterceptHandler interface

(defrecord BasicHandler [id]
  InterceptHandler
  (onPublish [_ msg]
    (let [payload (-> msg .getPayload (.toString StandardCharsets/UTF_8))]
      (log/info "Received on topic: " + (.getTopicName msg) + " content: " + payload)))
  (getID [_] id)
  (getInterceptedMessageTypes [_] InterceptHandler/ALL_MESSAGE_TYPES))

The full set of method to be overridden:

  1. onPublish

  2. onConnect

  3. onDisconnect

  4. onConnectionLost

  5. onSubscribe

  6. onUnsubscribe

  7. onMessageAcknowledged

  8. getID

  9. getInterceptedMessageTypes

The ALL_MESSAGE_TYPES vector contains the whole bunch of the related messages types.

Creating instance

The library contains default SimpleBroker implementation written in Java that requires resources' configuration file name to be passed into constructor.

In order to manage the instance comfortably it should be passed as a parameter into the Clojure record implements CljBroker interface.

CljBroker interface definition
(defprotocol CljBroker
  (start [o ^InterceptHandler handlers])
  (open [o ^InterceptHandler handlers])
  (stop [o])
  (close [o])
  (send [o from to data qos retain?]))

so the instantiation of the complete Broker is looks like:

(def config-name "my-broker-settings.conf")
(def srv-java (SimpleBroker. config-name))
(def srv-clj (Broker. srv-java))

Starting the service

The Clojure interface supports two approaches:

  1. controlling the instance by calling start/stop methods (that fully corresponds to its Java interface)

  2. controlling the instance by with-open macro

start/stop
...
(let [srv (Broker. (SimpleBroker. config-name))]
    (start srv (BasicHandler. "My Instance"))
    ;; your code here
    (stop srv))
...
with-open
...
(with-open [srv (Broker. (SimpleBroker. config-name))]
    (start srv (BasicHandler. "My Instance")))

;; or even

(def config-name "my-broker-settings.conf")
(def srv-java (SimpleBroker. config-name))
(def srv-clj (Broker. srv-java))

(with-open [srv (open srv-clj (BasicHandler. "My Instance"))]
    (comment "Do your stuff here"))

License

© 2022 Fern Flower Lab

Distributed under the MIT License.

Can you improve this documentation? These fine people already did:
MelKori & source-c
Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close