[com.dkdhub/clj-mqtt-broker "0.0.8"]
Unfortunately there are few to nothing libraries, natively suitable to be used in Clojure development that implement MQTT server side functionalities.
The Moquette library is Java MQTT broker based on an event model by Netty. The library has a good performance and is designed with embedding support available out of the box. Its configuration is compatible with well known Mosquitto Open Source MQTT server.
When being combined - these tools open the way to painless M2M communications for services written in Clojure.
As always we prefer to keep things as tiny and as simple as it ever possible, so the library interface is very ascetic and pure.
The following implementations are successfully passed compatibility tests:
Moquette
Mosquitto
Paho
MQTT.js
DKD/Brownie
any other tool that conform MQTT specs should pass too, we believe.
Add the following dependency into your project.clj’s `:dependecies section
[com.dkdhub/clj-mqtt-broker "0.0.8"]
include the library into your code, like:
(ns my.namespace
(:require [clj-mqtt-broker.core :as mqtt-core])
(:import (clj_mqtt_broker.core Broker)
(com.dkdhub.mqtt_broker SimpleBroker)))
and start coding.
(defrecord BasicHandler [id]
InterceptHandler
(onPublish [_ msg]
(log/info "got a message")
(let [payload (-> msg .getPayload (.toString StandardCharsets/UTF_8))]
(log/info (str "Received on topic: " (.getTopicName msg) " content: " payload))))
(onConnect [_ _msg])
(onDisconnect [_ _msg])
(onConnectionLost [_ _msg])
(onSubscribe [_ _msg])
(onUnsubscribe [_ _msg])
(onMessageAcknowledged [_ _msg])
(onSessionLoopError [_ _error])
(getID [_] id)
(getInterceptedMessageTypes [_] InterceptHandler/ALL_MESSAGE_TYPES))
(def config-name "moquette.conf")
(let [b (Broker. (SimpleBroker. config-name))]
(with-open [srv (open b (BasicHandler. "12345"))]
;; do what ever your need here upon sending
(send srv "MY-SERVER" "/MY_TOPIC" "TEST MESSAGE" 1 false)
;; do what ever you need here after sending
;; once you leave the `with-open` closure - the instance will be stopped automatically
;; once you leave the `let` statement - the instance will be destroyed automatically
))
The usage pattern is clear:
define handlers
create broker instance
start the service
start serving messages
That is - no complexity and no headaches, at all.
The handlers record should implement InterceptHandler interface
(defrecord BasicHandler [id]
InterceptHandler
(onPublish [_ msg]
(let [payload (-> msg .getPayload (.toString StandardCharsets/UTF_8))]
(log/info (str "Received on topic: " (.getTopicName msg) " content: " payload))))
(onConnect [_ _msg])
(onDisconnect [_ _msg])
(onConnectionLost [_ _msg])
(onSubscribe [_ _msg])
(onUnsubscribe [_ _msg])
(onMessageAcknowledged [_ _msg])
(onSessionLoopError [_ _error])
(getID [_] id)
(getInterceptedMessageTypes [_] InterceptHandler/ALL_MESSAGE_TYPES))
The full set of methods to be overridden:
onPublish
onConnect
onDisconnect
onConnectionLost
onSubscribe
onUnsubscribe
onMessageAcknowledged
onSessionLoopError
getID
getInterceptedMessageTypes
The ALL_MESSAGE_TYPES vector contains a full set of the related messages types.
The library contains default SimpleBroker implementation written in Java that requires resources' configuration file name to be passed into constructor.
In order to manage the instance comfortably it should be passed as a parameter into the Clojure record implements CljBroker interface.
(defprotocol CljBroker
(start [o ^InterceptHandler handlers])
(open [o ^InterceptHandler handlers])
(stop [o])
(close [o])
(send [o from to data qos retain?])
(clients [o])
(disconnect [o client] [o client flush?]))
so the instantiation of the complete Broker is looks like:
(def config-name "my-broker-settings.conf")
(def srv-java (SimpleBroker. config-name))
(def srv-clj (Broker. srv-java))
port 1883
host 0.0.0.0
allow_anonymous true
telemetry_enabled false
The Clojure interface supports two approaches:
controlling the instance by calling start/stop methods (that fully corresponds to its Java interface)
controlling the instance by with-open macro
...
(let [srv (Broker. (SimpleBroker. config-name))]
(start srv (BasicHandler. "My Instance"))
;; your code here
(stop srv))
...
...
(with-open [srv (Broker. (SimpleBroker. config-name))]
(start srv (BasicHandler. "My Instance")))
;; or even
(def config-name "my-broker-settings.conf")
(def srv-java (SimpleBroker. config-name))
(def srv-clj (Broker. srv-java))
(with-open [srv (open srv-clj (BasicHandler. "My Instance"))]
(comment "Do your stuff here"))
For more examples see test sources.
For more control, use AdvancedBroker with mqtt-config:
(ns my.namespace
(:require [clj-mqtt-broker.core :as mqtt])
(:import (clj_mqtt_broker.core Broker)
(com.dkdhub.mqtt_broker AdvancedBroker)))
(def config (mqtt/mqtt-config {:port-tcp 1883 :anonymous? true}))
(with-open [srv (mqtt/open (Broker. (AdvancedBroker. config))
(BasicHandler. "my-handler"))]
(mqtt/send srv "MY-SERVER" "/TOPIC" "payload" :exactly false))
| Key | Default | Description |
|---|---|---|
|
| TCP port for MQTT |
|
| WebSocket port |
|
| Bind address |
|
| Path to password file |
|
| Allow anonymous connections |
|
| WebSocket path |
| none |
|
|
| Persistence data directory |
|
| Max MQTT message size in bytes |
| none | Allow MQTT5 zero-byte client IDs |
| none | Buffer flush interval in milliseconds |
| none | Use native transport (kqueue/epoll) |
© 2022-2025 Fern Flower Lab
Distributed under the MIT Licence.
Can you improve this documentation? These fine people already did:
MelKori, A I & source-cEdit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |