Franz Kafka's favorite sister
All of the examples assume the following require:
(require '[ottla.core :as ottla])
The primary API operations take a config, which has at least the following keys:
:schema
- The postgres schema for the ottla logs and subscription tables.:conn-map
- A map of parameters for connecting to the the DB.Additionally, the administrative operations (create/remove topic, etc)
require a connected config, which is a config like the above that
includes a :conn
key for the active database connection.
Unless you need more control, though, the following pattern is the easiest way to start build and use this config. To set up a new ottla system in a database:
(ottla/with-connected-config [config (ottla/make-config {:database "fancy"
:user "bob"
:password "the-password"})]
(ottla/init! config))
Which would connect to the postgres database and create the necessary schema and tables within it.
After ottla has been initialized with the ottla/init!
operation,
topics can be created with add-topic!
:
(ottla/with-connected-config [config {,,,}]
(ottla/add-topic! config "my-new-topic"))
After ottla has been initialized with the ottla/init!
operation,
topics can be created with remove-topic!
:
(ottla/with-connected-config [config {,,,}]
(ottla/remove-topic! config "my-new-topic"))
Note that is a destructive action, and all data from the topic will be removed immediately.
To add to a topic's log, call append
with records, which are maps like the following:
:key
- data key:value
- data value:meta
- A map of optional metadata (analogous to Kafka headers)The key are value are stored in binary format, so a key and/or value serializer must be provided if the key or value are not already binary data. A serializer is a function that takes a key or value and returns a binary representation that can later be deserialized by consumers. As a simple example, here's an edn serializer and deserializer:
(def charset java.nio.charset.StandardCharsets/UTF_8)
(defn serialize-edn
[obj]
(.getBytes (pr-str obj) charset))
(defn deserialize-edn
[ba]
(with-open [rdr (java.io.PushbackReader.
(java.io.InputStreamReader.
(java.io.ByteArrayInputStream. ba)
charset))]
(clojure.edn/read rdr)))
So, assuming these serializing functions, here's how we might insert edn data into a topic:
(ottla/append! config "my-new-topic"
[{:value {:oh "cool"}} ,,,]
{:serialize-value serialize-edn})
Consumers are designed to be run in a managed Consumer
object, which can be started like this:
(ottla/start-consumer config {:topic "my-new-topic"} handler {:deserialize-value deserialize-edn})
This will spin up and return a Consumer
that should be kept around
until read to stop with (.close consumer)
. Consumers will usually be
long-lived and can be managed with whatever component lifecycle
framework you choose, but can also be used with with-open
for
short-lived consumers.
A Consumer will maintain 2 database connections: one solely for listening for real time notifications from the database, and one for periodic fetching of records. This latter connection can be reused in handlers.
A handler is a function of 2 args, the ottla connected config and a vector of records. It will be called on its own Thread, but will receive records in order.
The options arg accepts the following keys:
:poll-ms
- how often to poll the database for new records on the
topic. This is primarily used as a fallback in case of LISTEN/NOTIFY
misses. (default 15000):await-close-ms
- when closing the Consumer, how long to wait for
all threads to completely finish their work before shutting it down
forcibly.:deserialize-key
- a deserializer for the record keys:deserialize-value
- a deserializer for the record values:exception-handler
- a function that will receive any uncaught
Exception object (see below):xform
- optional transducer for records (after deserialize)Uncaught exceptions thrown either in the handler, any deserializer, or
from fetching will be caught and fed to the exception-handler
, which
by default just prints the exception, but could instead log it or in
some other way act on it. If this exception-handler returns the
keyword :ottla/shutdown
, the Consumer will begin its shutdown
process.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close