This library is a Clojure wrapper over the Aerospike Java library. The Java Documentation can be found here.
For a complete reference of Aerospike Java client internals I warmly recommend reading the Java client examples.
This library wraps the Java code, hides all Clojure Java interop and supplies a more functional approach to Aerospike. It also adds a few handy features.
A note worth mentioning at the beginning is that the Java client has both synchronous and asynchronous APIs. This library however, only wraps the asynchronous ones. If a synchronous behaviour is still desired, it can be be easily achieved, as shown below.
Also, although possible to combine several namespaces in a single cluster, this library uses a single client object per cluster/client.
For this tutorial we will take advantage of a locally deployed Aerospike DB with docker container. The following command should download and install a docker image with the latest Aerospike image on your machine.
$ sudo docker run -d --name aerospike -p 3000:3000 -p 3001:3001 -p 3002:3002 -p 3003:3003 aerospike"
The DB created contains a single namespace named test
. On production, Aerospike
is a distributed cluster composed of several nodes.
In order to follow the example further in this tutorial, you should run a Clojure REPL with the library located in the classpath. This can be done:
$ git clone https://github.com/AppsFlyer/aerospike-clj.git
$ cd aerospike-clj
$ lein repl
$ lein try aerospike-clj "0.2.1"
...
user=> (require '[aerospike-clj.client :as aero])
nil
profiles.clj
and running a repl there:(defproject af-common-rta-aerospike "3.0.0"
:dependencies [[aerospike-clj "2.0.0"]]
; ...
)
First, let's create a client:
user=> (require '[aerospike-clj.client :as aero])
nil
user=> (def c (aero/init-simple-aerospike-client ["localhost"] "test"))
;; Starting aerospike clients for clusters localhost with username null
#'user/c
user=>
That's it. A client creation requires only a vector of servers to bootstrap from
and a namespace name. Behind the scenes the client constructor is called with a
default ClientPolicy
.
In order to further configure the client for your needs pass a configuration map
as the last argument. The map is a flat map of string keys, corresponding to the
ClientPolicy
class. To use the Java enumerations, simply uppercase the first character:
user=> (def c (aero/init-simple-aerospike-client ["localhost"]
"test"
{"failIfNotConnected" true
"AuthMode" "INTERNAL"
"username" nil
"password" nil
"maxCommandsInProcess" 0}))
;; Starting aerospike clients for clusters localhost with username null
#'user/c
Currently, almost all configuration (except default batch/info/query/scan policies) can be passed to
init-simple-aerospike-client
in the conf
map. This includes ClientPolicy
and EventPolicy
configurations, flattened together (e.g maxCommandsInProcess
) :
The Java client can cache all API related policies for you, and then uses them when a
nil
policy is passed to the API call. This is worthwhile because most of the time,
the same policies are used throughout the application lifetime. Those caches are
initiated with the (mostly sane) defaults. You can change them by creating custom ones:
user=> (require '[aerospike-clj.policy :as policy])
nil
user=> (def new-wp (policy/map->write-policy {"CommitLevel" "COMMIT_MASTER" "sendKey" true}))
#'user/new-wp
user=> (.commitLevel new-wp)
Reflection warning...
#object[com.aerospike.client.policy.CommitLevel 0x2f1f3fef "COMMIT_MASTER"]
user=> (.sendKey new-wp)
Reflection warning...
true
The functions map->policy
and map->write-policy
are slow due to reflection and
are here to save you some Java interop. Since they are slow, do not use them
to create one-time-use policies for each API call. Use them just to tweak the defaults:
user=> (def c (aero/init-simple-aerospike-client ["localhost"] "test" {"writePolicyDefault" new-wp}))
#'user/c
user=> (.commitLevel (.writePolicyDefault (aero/get-client c)))
#object[com.aerospike.client.policy.CommitLevel 0x2f1f3fef "COMMIT_MASTER"]
Those AerospikeClient
fields are final
hence can only be set on client creation,
but you can also have a few useful policies def
ed somewhere and pass them later
under the :policy
key of the API calls.
Querying is only possible (read: easy) via the asynchronous APIs of the client.
aerospike-clj
also converts the callback model of the underlying APIs to a future
based model (using promesa
). This allows
users to configure additional logic to happen when the response returns.
But first, let's see a simple query:
user=> (require '[aerospike-clj.protocols :as pt])
nil
user=> (def f (pt/get-single c "not-there" "set-name"))
user=> (type f)
java.util.concurrent.CompletableFuture
user=> (deref f)
nil
Joy! We got a future answer. In this case, the deferred result is nil
, since the
key "not-there"
is missing. Futures can be composed with logic that happens once
they are delivered (for a complete documentation, see promesa
's docs here.
Behold:
user=> (require '[promesa.core :as p])
nil
user=> (p/then (pt/get-single c "index" "set-name")
#_=> #(if %1 (prn "good!") (prn "not there")))
"not there"
If the record existed we would get an AerospikeRecord
:
user=> (pt/put c "index" "set-name" 42 1000)
#object[java.util.concurrent.CompletableFuture 0x16283d4b "pending"]
user=> (def f (pt/get-single c "index" "set-name"))
#'user/f
user=> (deref f)
#aerospike_clj.client.AerospikeRecord{:payload 42, :gen 1, :ttl 284805805}
user=>
We observe a few new things:
put
API also returns futures. Those will contain the operation's result
when it completes.put
API requires an additional argument, the TTL. Although Aerospike
cluster namespaces have a default TTL, and there is also a default TTL in the
Java client, we chose to make your life harder here. This is because we believe
that TTLs are too important to be default, and programmers must think about them.:payload
with
the value, :gen
for the record generation, and a :ttl
for the record TTL, in
Aerospike format. This resembles the Record class.
In order to convert the TTL to Unix epoch you can call expiry-unix
on it (see below).Aerospike returns a TTL on the queried records that is Epoch style, but with a
different "beginning of time" which is "2010-01-01T00:00:00Z". Call expiry-unix
with the returned TTL to get a TTL reative to UNIX epoch if you want to convert
it later to a more standard timestamp.
user=> (-> f
#_=> deref
#_=> :ttl
#_=> aero/expiry-unix
#_=> java.time.Instant/ofEpochSecond
#_=> str)
"2019-01-10T08:43:25Z"
Let's do it in an asynchronous manner:
user=> (p/chain (pt/get-single c "index" "set-name")
#_=> :ttl
#_=> aero/expiry-unix
#_=> #(java.time.Instant/ofEpochSecond %)
#_=> str
#_=> println)
#object[java.util.concurrent.CompletableFuture 0x4a9620a9 "pending"]
2019-01-10T09:02:45Z
We got a deferred back and some time later the whole chain of composed logic
was triggered. We can also get the result once multiple required records return.
We will simply get a sequence of AerospikeRecord
s once all of them arrive:
user=> (run! #(pt/put c (str %1) "set-name" %1 1000) (range 5))
nil
user=> @(p/then (aero/get-multiple c (map str (range 5)) (repeat "set-name"))
#_=> #(map :payload %))
(0 1 2 3 4)
Since the returned future objects can be easily deref
ed, simply adding a @
before queries makes them synchronous.
The library takes advantage of futures' ability to compose and allows you to configure
a :transcoder
to conveniently set this logic:
get
Transcoders are functions of the AerospikeRecord instance, not the
deferred
value of it.put
Transcoders are functions on the passed payload. They are called before
the request is even put on the event-loop.user=> (pt/put c "index" "set-name" 42 1000)
#object[java.util.concurrent.CompletableFuture 0x4a9620a9 "pending"]
user=> (defn inc-transcoder [rec] (when rec
#_=> (update rec :payload inc)))
#'user/inc-transcoder
user=> (p/chain (pt/get-single c "index" "set-name" {:transcoder inc-transcoder})
#_=> :payload
#_=> println)
#object[java.util.concurrent.CompletableFuture 0x4a9620af "pending"]
43
The transcoder here is a function on the payload itself
user=> (pt/put c "17" "set-name" 1 1000 {:transcoder str})
#object[java.util.concurrent.CompletableFuture 0x4d025d9b "pending"]
user=> @(pt/get-single c "17" "set-name" {:transcoder #(:payload %1)})
"1"
The transcoder option saves some boilerplate and can be easily used to do more useful stuff, like de-serializing or (de)compressing data on the client side.
Can you improve this documentation? These fine people already did:
Ido Barkan, asaf.chelouche & Adi BelanEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close