Liking cljdoc? Tell your friends :D

How to write custom publishers

This guide explains how to write a publisher for a new target system. The target system can be anything: from the standard output to a fancy timeseries database, a SQL or NOSQL database, a streaming platform, a cloud system or even a machine learning system.

μ/log tries to make as simple as possible to write a stateful publisher. Most of the time is fairly easy to write a publisher which is stateless and doesn't care about downstream system availability. However, we know that real systems and real environments are usually more complex, they have to account for short time glitches, speed and throughput.

μ/log tries to do the heavy lifting of managing the publishing buffers in such way that downstream system can have transient failure and still send the events once they recover. At the same time, we want to avoid that the failure of a downstream system makes our events to pile up which could ultimately cause our system to run out of memory.

For this reason all the buffers we use are ring-buffers with a limited capacity. If the issue with the downstream system persists in time then older events are automatically dropped out to make place for fresher events. Should the downstream system recover at any point, the publisher will be able to send the full content of the buffer.

For more information about how the μ/log's internals work, please read μ/log internals.

(defprotocol PPublisher
  "Publisher protocol"

  (agent-buffer [this]
    "Returns the agent-buffer where items are sent to, basically your
    inbox.")

  (publish-delay [this]
    "The number of milliseconds between two calls to `publish` function.
     return `nil` if you don't want mu/log call the `publish` function")

  (publish [this buffer]
    "publishes the items in the buffer and returns the new state of
    the buffer which presumably doesn't contains the messages
    successfully sent.")

  )

Let's dissect it function by function. The first one is agent-buffer, it returns a Clojure agent which is wrapping a ring-buffer which is where the events will be delivered. Depending events will be pushed to the downstream system you should size this accordingly. For example 10000 events is good size to account for a transient error in the target system without taking too much memory. We already have an agent wrapping a ring-buffer so pretty much this will always return (rb/agent-buffer 10000).

Next we have the publish-deplay, this is the delay in milliseconds between two consecutive calls of the publish function. This is usually between 200 and a few seconds. The smaller it is, the more frequent the target system will be called. The larger it is the more you have to account for buffering space. Strike a good balance between DDOS'ing the target system and buffering too much.

Finally, publish is the actual call. μ/log will call this function at regular intervals (publish-delay) and pass the content of the buffer. The function has to push the events to the target system and return the new content of the buffer. Calls to this function will be serialized (no concurrent calls) so it is important to don't take up too much time and set a timeout that is smaller than the publish-delay.

Let's see how to write a simple publisher that writes the events to the console. Let's add also some configurable option, for example we might want to be able to pretty-print ({:pretty-print true}) the events or to simply print them one per line.

(ns my-custom.publisher
  (:require [com.brunobonacci.mulog.buffer :as rb]
            [clojure.pprint :refer [pprint]]))


(deftype MyCustomPublisher
    [config buffer]

  com.brunobonacci.mulog.publisher.PPublisher
  (agent-buffer [_]
    buffer)

  (publish-delay [_]
    500)

  (publish [_ buffer]
    ;; check our printer option
    (let [printer (if (:pretty-print config) pprint prn)]
      ;; items are pairs [offset <item>]
      (doseq [item (map second (rb/items buffer))]
        ;; print the item
        (printer item)))
    ;; return the buffer minus the published elements
    (rb/clear buffer)))


(defn my-custom-publisher
  [config]
  (MyCustomPublisher. config (rb/agent-buffer 10000)))

That's it! That's all it takes to write a publisher. Now to use it you can start it with:

(u/start-publisher!
  {:type :custom
   :fqn-function "my-custom.publisher/my-custom-publisher"
   :pretty-print true})

That was simple right? let's add some complications; instead of printing to the console we want to write to a file so that we have to keep the file writer at hand (in our state) and we want also to pace how many items we print at once. This might be useful if the target system can only receive a limited number of items.

Firstly let's make a version of pprint which output to a string.

(defn- pprint-str
    [v]
    (with-out-str
      (pprint v)))

Now let's amend MyCustomPublisher to accept the filerwriter and push at most 1000 items.

(deftype MyCustomPublisher
  [config buffer ^java.io.Writer filewriter]


  com.brunobonacci.mulog.publisher.PPublisher
  (agent-buffer [_]
    buffer)


  (publish-delay [_]
    500)


  (publish [_ buffer]
    ;;    check our printer option
    (let [printer (if (:pretty-print config) pprint-str prn-str)
          ;; take at most `:max-items` items
          items (take (:max-items config) (rb/items buffer))
          ;; save the offset of the last items
          last-offset (-> items last first)]
      ;; write the items to the file
      (doseq [item (map second items)]
        ;; print the item
        (.write filewriter (printer item)))
      ;; flush the buffer
      (.flush filewriter)
      ;; return the buffer minus the published elements
      (rb/dequeue buffer last-offset))))


  (defn my-custom-publisher
    [{:keys [filename] :as config}]
    (let [config (merge {:pretty-print false :max-items 1000} config)]
      (MyCustomPublisher. config (rb/agent-buffer 10000)
                          (io/writer (io/file filename) :append true))))

In the above example we can see that we take only a portion of the buffer content. Importantly we save the offset of the last item we publish so that we can discard all the messages in the buffer up and including last offset. The rest of the publisher is pretty much the same.

Error handling

So far we haven't spoke about error handling at all, that it is because there is not much to say. If the publish function raises an exception, nothing to worry about, the publish will be retried after the publish-delay interval passed. So for example, if you are posting to a remote system and the system in temporarily unavailable, nothing to worry about, μ/log it will keep retrying. Should the target system not be available for a longer period of time, nothing to worry about in this case as well, because the buffer will keep filling up until it is full and then start dropping older events ensuring that the system won't run out of memory.

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close