Table of Contents generated with DocToc
(require '[cbass :refer [new-connection store find-by scan delete]])
(def conf {"hbase.zookeeper.quorum" "127.0.0.1:2181" "zookeeper.session.timeout" 30000})
(def conn (new-connection conf))
By default cbass
uses nippy for serialization / deserialization. There are more details about it in the Serialization section. This can be changed by providing your own, optional, pack
/ unpack
functions when creating an HBase connection:
(def conn (new-connection conf :pack identity
:unpack identity))
In this example we are just muting "packing" and "unpacking" relying on the custom serialization being done prior to calling cbass
, so the data is a byte array, and deserialization is done after the value is returned from cbass, since it will just return a byte array back in this case (i.e. identity
function for both).
;; args: conn, table, row key, family, data, [timestamp]
user=> (store conn "galaxy:planet" "earth" "galaxy" {:inhabited? true
:population 7125000000
:age "4.543 billion years"})
Depending on a key strategy/structure sometimes it makes sense to only store row-keys / families witout values:
user=> (store conn "galaxy:planet" "pluto" "galaxy")
It is possible to pass a custom timestamp to hbase:
user=> (store conn "galaxy:planet" "earth" "galaxy" {:inhabited? true
:population 7125000000
:age "4.543 billion years"}
1000)
In case there are multiple rows to store in the same table, store-batch
can help out:
(store-batch conn "galaxy:planet"
[["mars" "galaxy" {:inhabited? true :population 3 :age "4.503 billion years"}]
["earth" "galaxy" {:inhabited? true :population 7125000000 :age "4.543 billion years"}]
["pluto" "galaxy"]
["neptune" "galaxy" {:inhabited? :unknown :age "4.503 billion years"}]]))
notice the "pluto", it has no columns, which is also fine.
You can pass a custom timestamp on each row:
(store-batch conn "galaxy:planet"
[["mars" "galaxy" {:inhabited? true :population 3 :age "4.503 billion years"} 1000]
["earth" "galaxy" {:inhabited? true :population 7125000000 :age "4.543 billion years"} 2000]
["pluto" "galaxy" nil 3000]
["neptune" "galaxy" {:inhabited? :unknown :age "4.503 billion years"} 4000]]))
There are two primary ways data is found in HBase:
;; args: conn, table, row key, [family, columns, [time-range]]
user=> (find-by conn "galaxy:planet" "earth")
{:age "4.543 billion years", :inhabited? true, :population 7125000000}
user=> (find-by conn "galaxy:planet" "earth" "galaxy")
{:age "4.543 billion years", :inhabited? true, :population 7125000000}
user=> (find-by conn "galaxy:planet" "earth" "galaxy" #{:age :population})
{:age "4.543 billion years", :population 7125000000}
HBase calls them scanners, hence the scan
function name.
Let's first look directly at HBase (shell) to see the data we are going to scan over:
hbase(main):002:0> scan 'galaxy:planet'
ROW COLUMN+CELL
earth column=galaxy:age, timestamp=1440880021543, value=NPY\x00i\x134.543 billion years
earth column=galaxy:inhabited?, timestamp=1440880021543, value=NPY\x00\x04\x01
earth column=galaxy:population, timestamp=1440880021543, value=NPY\x00+\x00\x00\x00\x01\xA8\xAE\xDF@
mars column=galaxy:age, timestamp=1440880028315, value=NPY\x00i\x134.503 billion years
mars column=galaxy:inhabited?, timestamp=1440880028315, value=NPY\x00\x04\x01
mars column=galaxy:population, timestamp=1440880028315, value=NPY\x00d\x03
neptune column=galaxy:age, timestamp=1440880036629, value=NPY\x00i\x134.503 billion years
neptune column=galaxy:inhabited?, timestamp=1440880036629, value=NPY\x00j\x07unknown
3 row(s) in 0.0230 seconds
By default, find-by
returns the latest version of a row. If you want to retrieve an earlier version of the cell, you need to pass a :time-range
to find-by
:
user=> (store conn "galaxy:planet" "earth" "galaxy" {:population 3} 1000)
user=> (store conn "galaxy:planet" "earth" "galaxy" {:population 7125000000} 2000)
user=> (find-by conn "galaxy:planet" "earth" #{:population} :time-range {:from-ms 500 :to-ms 1500})
{:last-updated 1000, :population 3}
HBase scanning is pretty flexible: by row key from/to prefixes, by time ranges, by families/columns, etc..
Here are some examples:
;; args: conn, table, {:row-key-fn, :family, :columns, :from, :to, :time-range {:from-ms :to-ms}}
user=> (scan conn "galaxy:planet")
{"earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3},
"neptune" {:age "4.503 billion years", :inhabited? :unknown}}
By default cbass will assume row keys are strings, but in practice keys are prefixed and/or hashed. Hence to read a row key from HBase, a custom row key function may come handy:
;; args: conn, table, {:row-key-fn, :family, :columns, :from, :to, :time-range {:from-ms :to-ms}}
user=> (scan conn "galaxy:planet" :row-key-fn #(keyword (String. %)))
{:earth
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
:mars {:age "4.503 billion years", :inhabited? true, :population 3},
:neptune {:age "4.503 billion years", :inhabited? :unknown}}
by family
user=> (scan conn "galaxy:planet" :family "galaxy")
{"earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3},
"neptune" {:age "4.503 billion years", :inhabited? :unknown}}
specifying columns (qualifiers)
user=> (scan conn "galaxy:planet" :family "galaxy"
:columns #{:age :inhabited?})
{"earth" {:age "4.543 billion years", :inhabited? true},
"mars" {:age "4.503 billion years", :inhabited? true},
"neptune" {:age "4.503 billion years", :inhabited? :unknown}}
Data can be scanned by a row key prefix using :from
and/or :to
keys:
user=> (scan conn "galaxy:planet" :from "ma")
{"mars" {:age "4.503 billion years", :inhabited? true, :population 3},
"neptune" {:age "4.503 billion years", :inhabited? :unknown}}
:to
is exclusive:
user=> (scan conn "galaxy:planet" :from "ea"
:to "ma")
{"earth" {:age "4.543 billion years", :inhabited? true, :population 7125000000}}
notice, no Neptune:
user=> (scan conn "galaxy:planet" :to "nep")
{"earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3}}
Starting from hbase-client 0.99.1
, cbass can just do :starts-with
, in case no :to
is needed.
Notice, we added saturday
and saturn
for a better example:
user=> (scan conn "galaxy:planet")
{"earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3},
"neptune" {:age "4.503 billion years", :inhabited? :unknown},
"pluto" {},
"saturday" {:age "24 hours", :inhabited? :sometimes},
"saturn" {:age "4.503 billion years", :inhabited? :unknown}}
using :starts-with
:
user=> (scan conn "galaxy:planet" :starts-with "sa")
{"saturday" {:age "24 hours", :inhabited? :sometimes},
"saturn" {:age "4.503 billion years", :inhabited? :unknown}}
If you look at the data from HBase shell (above), you'll see that every row has a timestamp associated with it.
These timestamps can be used to scan data within a certain time range:
user=> (scan conn "galaxy:planet" :time-range {:from-ms 1440880021544
:to-ms 1440880036630})
{"mars" {:age "4.503 billion years", :inhabited? true, :population 3},
"neptune" {:age "4.503 billion years", :inhabited? :unknown}}
in case :from-ms
is missing, it defauts to 0
:
user=> (scan conn "galaxy:planet" :time-range {:to-ms 1440880036629})
{"earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3}}
same analogy with :to-ms
, if it is mising, it defaults to Long/MAX_VALUE
:
user=> (scan conn "galaxy:planet" :time-range {:from-ms 1440880036629})
{"neptune" {:age "4.503 billion years", :inhabited? :unknown}}
Here is a regular table scan with all the defaults:
user=> (scan conn "galaxy:planet")
{"earth" {:age "4.543 billion years", :inhabited? true, :population 7125000000},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3},
"neptune" {:age "4.503 billion years", :inhabited? :unknown}}
many times it makes sense to scan table in reverse order to have access to the latest updates first without scanning the whole search space:
user=> (scan conn "galaxy:planet" :reverse? true)
{"neptune" {:age "4.503 billion years", :inhabited? :unknown},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3},
"earth" {:age "4.543 billion years", :inhabited? true, :population 7125000000}}
Since scanning partially gets its name from a "table scan", in many cases it may return quite large result sets. Often we'd like to limit the number of rows returned, but HBase does not make it simple for various reasons.
cbass makes it quite simple to limit the number of rows returned by using a :limit
key:
user=> (scan conn "galaxy:planet" :limit 2)
{"earth" {:age "4.543 billion years", :inhabited? true, :population 7125000000},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3}}
For example to get the latest 3 planets added, we can scan in reverse (latest) with a limit of 3:
user=> (scan conn "galaxy:planet" :limit 3 :reverse? true)
For a maximum flexibility an HBase Filter can be passed directly to scan
via a :filter
param.
Here is an example of ColumnPrefixFilter, all other HBase filters will work the same.
The data we work with:
user=> (scan conn "galaxy:planet")
{"earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3},
"neptune" {:age "4.503 billion years", :inhabited? :unknown},
"pluto" {},
"saturday" {:age "24 hours", :inhabited? :sometimes},
"saturn" {:age "4.503 billion years", :inhabited? :unknown}}
Creating a filter that would only look the rows where columns start with "ag", and scanning with it:
user=> (def f (ColumnPrefixFilter. (.getBytes "ag")))
#'user/f
user=> (scan conn "galaxy:planet" :filter f)
{"earth" {:age "4.543 billion years"},
"mars" {:age "4.503 billion years"},
"neptune" {:age "4.503 billion years"},
"saturday" {:age "24 hours"},
"saturn" {:age "4.503 billion years"}}
Similarly creating a filter that would only look the rows where columns start with "pop", and scanning with it:
user=> (def f (ColumnPrefixFilter. (.getBytes "pop")))
#'user/f
user=> (scan conn "galaxy:planet" :filter f)
{"earth" {:population 7125000000},
"mars" {:population 3}}
In order to get more intel on when the results were updated last, you can add :with-ts? true
to scan.
It will look at all the cells in the result row, and will return the latest timestamp.
user=> (scan conn "galaxy:planet")
{"earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"mars" {:age "4.503 billion years", :inhabited? true, :population 3},
"neptune" {:age "4.503 billion years", :inhabited? :unknown},
"pluto" {:one 1, :three 3, :two 2},
"saturday" {:age "24 hours", :inhabited? :sometimes},
"saturn" {:age "4.503 billion years", :inhabited? :unknown}}
and this is what the result of :with-ts? true
will look like:
user=> (scan conn "galaxy:planet" :with-ts? true)
{"earth"
{:last-updated 1449681589719,
:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"mars"
{:last-updated 1449681589719,
:age "4.503 billion years",
:inhabited? true,
:population 3},
"neptune"
{:last-updated 1449681589719,
:age "4.503 billion years",
:inhabited? :unknown},
"pluto" {:last-updated 1449681589719, :one 1, :three 3, :two 2},
"saturday"
{:last-updated 1449681589719,
:age "24 hours",
:inhabited? :sometimes},
"saturn"
{:last-updated 1449681589719,
:age "4.503 billion years",
:inhabited? :unknown}}
not exactly interesting, since all the rows were stored in batch at the same exact millisecond. Let's spice it up.
Have you heard the latest news about life at Saturn? Let's record it:
user=> (store conn "galaxy:planet" "saturn" "galaxy" {:inhabited? true})
and scan again:
user=> (scan conn "galaxy:planet" :with-ts? true)
{"earth"
{:last-updated 1449681589719,
:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"mars"
{:last-updated 1449681589719,
:age "4.503 billion years",
:inhabited? true,
:population 3},
"neptune"
{:last-updated 1449681589719,
:age "4.503 billion years",
:inhabited? :unknown},
"pluto" {:last-updated 1449681589719, :one 1, :three 3, :two 2},
"saturday"
{:last-updated 1449681589719,
:age "24 hours",
:inhabited? :sometimes},
"saturn"
{:last-updated 1449682282217,
:age "4.503 billion years",
:inhabited? true}}
notice the Saturn's last update timestamp: it is now 1449682282217
.
In some cases, we need only to find the row keys without the associated data.
In that case, you pass :keys-only? true
to scan
.
user=> (scan conn "galaxy:planet" :from "ea"
:to "ma"
:keys-only? true)
{"earth" {}}
Of course all of the above can be combined together, and that's the beauty or scanners:
user=> (scan conn "galaxy:planet" :family "galaxy"
:columns #{:age}
:from "ma"
:to "z"
:time-range {:to-ms 1440880036630})
{"mars" {:age "4.503 billion years"},
"neptune" {:age "4.503 billion years"}}
There are lots of other ways to "scan the cat", but for now here are several.
By default scan
will return a realized (not lazy) result as a map. In case too much data is expected to
come back or the problem is best solved in batches, scan
can be asked to return a lazy sequence of result
maps instead by calling lazy-scan
.
IMPORTANT: It's the responsibility of the caller to close table and scanner.
user=> (lazy-scan conn "galaxy:planet")
{:table <table>
:scanner <scanner>
:rows (["earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000}]
["mars" {:age "4.503 billion years", :inhabited? true, :population 3}]
["neptune" {:age "4.503 billion years", :inhabited? :unknown}]
["pluto" {}]
["saturday" {:age "24 hours", :inhabited? :sometimes}]
["saturn" {:age "4.503 billion years", :inhabited? true}])}
it is really a LazySeq:
user=> (type (:rows (scan conn "galaxy:planet" :lazy? true)))_
clojure.lang.LazySeq
whereas by default it is a map:
user=> (type (scan conn "galaxy:planet"))
clojure.lang.PersistentArrayMap
;; args: conn, table, row key, [family, columns]
user=> (delete conn "galaxy:planet" "earth" "galaxy" #{:age :population})
user=> (find-by conn "galaxy:planet" "earth")
{:inhabited true}
;; args: conn, table, row key, [family, columns]
user=> (delete conn "galaxy:planet" "earth" "galaxy")
user=> (find-by conn "galaxy:planet" "earth")
nil
;; args: conn, table, row key, [family, columns]
user=> (delete conn "galaxy:planet" "mars")
user=> (find-by conn "galaxy:planet" "mars")
nil
There is often a case where rows need to be deleted by a filter, that is similar to the one used in scan (i.e. by row key prefix, time range, etc.) HBase does not really help there besides providing a BulkDeleteEndpoint coprocessor.
This is not ideal as it delegates work to HBase "stored procedures" (effectively that is what coprocessors are). It really pays off during massive data manipulation since it does happen directly on the server, but in simpler cases, which are many, coprocessors are less than ideal.
cbass achives "deleting by anything" by a trivial flow: "scan + multi delete" packed in a "delete-by" function which preserves the "scan"'s syntax:
user=> (scan conn "galaxy:planet")
{"earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"neptune" {:age "4.503 billion years", :inhabited? :unknown},
"pluto" {},
"saturday" {:age "24 hours", :inhabited? :sometimes},
"saturn" {:age "4.503 billion years", :inhabited? :unknown}}
user=> (delete-by conn "galaxy:planet" :from "sat" :to "saz")
;; deleting [saturday saturn], since they both match the 'from/to' criteria
look ma, no saturn, no saturday:
user=> (scan conn "galaxy:planet")
{"earth"
{:age "4.543 billion years",
:inhabited? true,
:population 7125000000},
"neptune" {:age "4.503 billion years", :inhabited? :unknown},
"pluto" {}}
and of course any other criteria that is available in "scan" is available in "delete-by".
Most of the time HBase keys are prefixed (salted with a prefix). This is done to avoid "RegionServer hotspotting".
"delete-by" internally does a "scan" and returns keys that matched. Hence in order to delete these keys they have to be "re-salt-ed" according to the custom key design.
cbass addresses this by taking an optional delete-key-fn
, which allows to "put some salt back" on those keys.
Here is a real world example:
;; HBase data
user=> (scan conn "table:name")
{"���|8276345793754387439|transfer" {...},
"���|8276345793754387439|match" {...},
"���|8276345793754387439|trade" {...},
"�d\k^|28768787578329|transfer" {...},
"�d\k^|28768787578329|match" {...},
"�d\k^|28768787578329|trade" {...}}
a couple observations about the key:
In order to delete, say, all keys that start with 8276345793754387439
,
besides providing :from
and :to
, we would need to provide a :row-key-fn
that would de salt and split, and then a delete-key-fn
that can reassemble it back:
(delete-by conn progress :row-key-fn (comp split-key without-salt)
:delete-key-fn (fn [[x p]] (with-salt x p))
:from (-> "8276345793754387439" salt-pipe)
:to (-> "8276345793754387439" salt-pipe+))))
*salt
, *split
and *pipe
functions are not from cbass,
they are here to illustrate the point of how "delete-by" can be used to take on the real world.
;; HBase data after the "delete-by"
user=> (scan conn "table:name")
{"�d\k^|28768787578329|transfer" {...},
"�d\k^|28768787578329|match" {...},
"�d\k^|28768787578329|trade" {...}}
HBase requires all data to be stored as bytes, i.e. byte arrays. Hence some serialization / deserialzation defaults are good to have.
cbass uses a great nippy serialization library by default, but of course not everyone uses nippy, plus there are cases where the work needs to be on a pre existing dataset.
Serialization in cbass is pluggable via pack-up-pack
function that takes two functions, the one to pack and the one to unpack:
(pack-un-pack {:p identity :u identity})
In the case above we are just muting packing unpacking relying on the custom serialization being done prior to calling cbass, so the data is a byte array, and deserialization is done on the return value from cbass, since it will just return a byte array back in this case (i.e. identity
for both).
But of course any other pack/unpack fuctions can be provided to let cbass know how to serialize and deserialize.
cbass keeps an internal state of pack/unpack functions, so pack-un-pack
would usually be called just once when an application starts.
While calling pack-un-pack
works great, in the future, it would be better to specify serializers locally per connection. A new-connection
function takes pack
and unpack
as optional arguments, and this would be a prefered way to plug in serializers vs. pack-un-pack
:
(def conn (new-connection conf :pack identity
:unpack identity))
HBase offers counters in the form of the mutation API. One caveat is that the data isn't serialized with nippy so we have to manage deserialization ourselves:
=> (cbass/pack-un-pack {:p #(cbass.tools/to-bytes %) :u identity})
=> (require '[cbass.mutate :as cmut])
=> (cmut/increment conn "galaxy:planet" "mars" "galaxy" :landers 7)
#object[org.apache.hadoop.hbase.client.Result
0x7017e957
"keyvalues={mars/galaxy:landers/1543441160950/Put/vlen=8/seqid=0}"
=> (find-by conn "galaxy:planet" "mars" "galaxy")
{:last-updated 1543441160950,
:age #object["[B" 0x2207b2e6 "[B@2207b2e6"],
:inhabited? #object["[B" 0x618e78f7 "[B@618e78f7"],
:landers #object["[B" 0xd63e8e6 "[B@d63e8e6"],
:population #object["[B" 0x644599bb "[B@644599bb"]}
=> (cbass.tools/bytes->num (:landers (find-by conn "galaxy:planet" "mars" "galaxy")))
7
There's support for batch processing of increments as well as for using the async BufferedMutator for high throughput. See the source for more info.
Copyright © 2018 tolitius
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.
Can you improve this documentation? These fine people already did:
Anatoly, anatoly, Yehonathan Sharvit & Rick MangiEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close