Liking cljdoc? Tell your friends :D

How to write custom publishers

This guide explains how to write a publisher for a new target system. The target system can be anything: from the standard output to a fancy timeseries database, a SQL or NOSQL database, a streaming platform, a cloud system or even a machine learning system.

μ/log tries to make as simple as possible to write a stateful publisher. Most of the time is fairly easy to write a publisher which is stateless and doesn't care about downstream system availability. However, we know that real systems and real environments are usually more complex, they have to account for short time glitches, speed and throughput.

μ/log tries to do the heavy lifting of managing the publishing buffers in such way that downstream system can have transient failure and still send the events once they recover. At the same time, we want to avoid that the failure of a downstream system makes our events to pile up which could ultimately cause our system to run out of memory.

For this reason all the buffers we use are ring-buffers with a limited capacity. If the issue with the downstream system persists in time then older events are automatically dropped out to make place for fresher events. Should the downstream system recover at any point, the publisher will be able to send the full content of the buffer.

For more information about how the μ/log's internals work, please read μ/log internals.

(defprotocol PPublisher
  "Publisher protocol"

  (agent-buffer [this]
    "Returns the agent-buffer where items are sent to, basically your
    inbox.")

  (publish-delay [this]
    "The number of milliseconds between two calls to `publish` function.
     return `nil` if you don't want μ/log call the `publish` function")

  (publish [this buffer]
    "publishes the items in the buffer and returns the new state of
    the buffer which presumably doesn't contains the messages
    successfully sent.")

  )

Let's dissect it function by function. The first one is agent-buffer, it returns a Clojure agent which is wrapping a ring-buffer which is where the events will be delivered. You should size this according to how events will be pushed to the downstream system. For example 10000 events is good size to account for a transient error in the target system without taking too much memory. We already have an agent wrapping a ring-buffer so pretty much this will always return (rb/agent-buffer 10000).

Next we have the publish-deplay, this is the delay in milliseconds between two consecutive calls of the publish function. This is usually between 200 and a few seconds. The smaller it is, the more frequent the target system will be called. The larger it is the more you have to account for buffering space. Strike a good balance between DDOS'ing the target system and buffering too much.

Finally, publish is the actual call. μ/log will call this function at regular intervals (publish-delay) and pass the content of the buffer. The function has to push the events to the target system and return the new content of the buffer. Calls to this function will be serialized (no concurrent calls) so it is important to not take up too much time and set a timeout that is smaller than the publish-delay.

Let's see how to write a simple publisher that writes the events to the console. Let's add also some configurable option, for example we might want to be able to pretty-print ({:pretty-print true}) the events or to simply print them one per line.

(ns my-custom.publisher
  (:require [com.brunobonacci.mulog.buffer :as rb]
            [clojure.pprint :refer [pprint]]))


(deftype MyCustomPublisher [config buffer]

  com.brunobonacci.mulog.publisher.PPublisher
  (agent-buffer [_]
    buffer)

  (publish-delay [_]
    500)

  (publish [_ buffer]
    ;; check our printer option
    (let [printer (if (:pretty-print config) pprint prn)]
      ;; items are pairs [offset <item>]
      (doseq [item (map second (rb/items buffer))]
        ;; print the item
        (printer item)))
    ;; return the buffer minus the published elements
    (rb/clear buffer)))


(defn my-custom-publisher
  [config]
  (MyCustomPublisher. config (rb/agent-buffer 10000)))

That's it! That's all it takes to write a publisher. Now to use it you can start it with:

(u/start-publisher!
  {:type :custom
   :fqn-function "my-custom.publisher/my-custom-publisher"
   :pretty-print true})

That was simple right? Let's add some complications; instead of printing to the console we want to write to a file so that we have to keep the file writer at hand (in our state) and we want also to pace how many items we print at once. This might be useful if the target system can only receive a limited number of items.

Firstly let's make a version of pprint which output to a string.

(defn- pprint-str
    [v]
    (with-out-str
      (pprint v)))

Now let's amend MyCustomPublisher to accept the filerwriter and push at most 1000 items.

(deftype MyCustomPublisher [config buffer ^java.io.Writer filewriter]

  com.brunobonacci.mulog.publisher.PPublisher
  (agent-buffer [_]
    buffer)


  (publish-delay [_]
    500)


  (publish [_ buffer]
    ;;    check our printer option
    (let [printer (if (:pretty-print config) pprint-str prn-str)
          ;; take at most `:max-items` items
          items (take (:max-items config) (rb/items buffer))
          ;; save the offset of the last items
          last-offset (-> items last first)]
      ;; write the items to the file
      (doseq [item (map second items)]
        ;; print the item
        (.write filewriter (printer item)))
      ;; flush the buffer
      (.flush filewriter)
      ;; return the buffer minus the published elements
      (rb/dequeue buffer last-offset)))


  ;; If you need to release/close resources when the pubslisher
  ;; is stopped you can implement the `java.io.Closeable` and
  ;; it will be called when the publisher is stopped as last call.
  java.io.Closeable
  (close [_]
    (.flush filewriter)
    (.close filewriter)))


(defn my-custom-publisher
  [{:keys [filename] :as config}]
  (let [config (merge {:pretty-print false :max-items 1000} config)]
    (MyCustomPublisher. config (rb/agent-buffer 10000)
                        (io/writer (io/file filename) :append true))))

In the above example we can see that we take only a portion of the buffer content. Importantly we save the offset of the last item we publish so that we can discard all the messages in the buffer up and including last offset. The rest of the publisher is pretty much the same.

Error handling

So far we haven't spoke about error handling at all, that's because there is not much to say. If the publish function raises an exception, nothing to worry about, the publish will be retried after the publish-delay interval passed. So for example, if you are posting to a remote system and the system in temporarily unavailable, nothing to worry about, μ/log will keep retrying. Should the target system not be available for a longer period of time, nothing to worry about in this case as well, because the buffer will keep filling up until it is full and then start dropping older events ensuring that the system won't run out of memory.

Support for user-defined transformations

If you are building a general purpose publisher it is a good idea to provide the ability to take a general transformation which can be applied to the events. This can be very useful for filtering which events you wish to send to a specific publisher or for performing simple event transformations. For example, the transformation could be used to anonymize some sensitive fields which you might not want to see in one destination. Sometimes it is useful to filter noisy events out and get only the events you are interested into a particular publisher.

All the built-in publishers support custom transformation via the :transform configuration.

If you are implementing a Publisher, consider adding the support as well. To add the support is easy, just look for a function associated to the :transform key in your configuration and apply the transformation to the events you get from the buffer.

For example, in our previous example:

(ns my-custom.publisher
  (:require [com.brunobonacci.mulog.buffer :as rb]
            [clojure.pprint :refer [pprint]]))


(deftype MyCustomPublisher [config buffer]

  com.brunobonacci.mulog.publisher.PPublisher
  (agent-buffer [_]
    buffer)

  (publish-delay [_]
    500)

  (publish [_ buffer]
    ;; check our printer option
    (let [printer (if (:pretty-print config) pprint prn)
          ;; HERE: retrieve the transformation function
          transform (:transform config)]
      ;; items are pairs [offset <item>], APPLY HERE the transform
      (doseq [item (transform (map second (rb/items buffer)))]
        ;; print the item
        (printer item)))
    ;; return the buffer minus the published elements
    (rb/clear buffer)))


(defn my-custom-publisher
  [config]
  ;; if a `transform` function is not provided, then use identity
  (let [config (update config :transform (fn [f] (or f identity)))]
    (MyCustomPublisher. config (rb/agent-buffer 10000))))

Remember the transform is a function which applies to all events, it can do any sort of operation and it is optional.

Can you improve this documentation? These fine people already did:
Bruno Bonacci, Emlyn Corrin & Arnaud Bos
Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close