This Clojure library can be used to extract data from a SQL database or an ElasticSearch index into AWS S3 storage. The following steps are performed:
Notably, the library does it by streaming the data through all the steps, so that:
One doesn't wait for any of the steps to be complete in order to begin the next one.
The data flows like water in a river, oin a steady stream and without overflowing
the banks—that is to say, without piling up in memory at any point. Hence, the
rowboat
name.
This is accomplished by calling Akka Streams from Clojure. This Scala library can be configured to stream data through any graph of transformations (at each of the graph's nodes), even a graph with loops, while handling backpressure. At no producer-to-consumer edge of the graph does the producer provide more data than the consumer has the capacity to process at any given time.
Two Akka graphs are used for this purpose, connected by Clojure channels—as if
a wormhole linking two separate universes. This is because the zipping node
requires a Source
of Source
s (for technical details, including an ASCII
drawing of the graphs and the wormhole, see the extensive comments in the code
itself, especially in the rowboat.core
namespace).
Thus, Clojure and Scala help each other to perform the streaming task.
;; AWS configuration.
(def region "us-east-1") ;; or your AWS region
(def access-key-id nil) ;; or your AWS access key ID
(def secret-key nil) ;; or your AWS secret key
(def profile "saml") ;; or your AWS profile (only used if the keys are nil)
(def bucket "my-bucket") ;; or your AWS S3 bucket's name
(def aws-config {:profile profile :region region})
;; Database configuration.
(def db-username "my-username") ;; or my database user
(def db-password "my-password") ;; or my database password
(def db-url "jdbc:postgresql://my-server/my-database") ;; PostgreSQL as an example
(def db-config {:url db-url
:adapter "postgresql"
:username db-username
:password db-password
:read-only true ;; We will only read the data, after all.
:auto-commit false ;; If true, it will be overriden to false, anyway
})
(def datasource (hikari-cp.core/make-datasource db-config))
;; Callback functions to signal the various lifecycle changes,
;; as well as to perform notifications (email, for example) when
;; ready, as well as any bookkeeping operations.
(def on-start
(fn [{:keys [id]}]
(println "Processing" id)))
(def on-success
(fn [{:keys [id size filename]}]
;; This would also be a good time to notify the requesting user
;; that the file is available for download.
(println "Successful" id "Size:" size "File:" filename)))
(def on-failure
(fn [{:keys [id error]}]
(println "Failed" id "Error:" error)))
(def on-empty
(fn [{:keys [id]}]
(println "Empty" id)))
(def callbacks {:on-start on-start
:on-success on-success
:on-failure on-failure
:on-empty on-empty})
;; Overall config and execution.
(def config {:bucket bucket
:aws-config aws-config
:datasource datasource
;; fetch-size is the size of block of rows
;; fetched each time (1024 by default).
:opts {:fetch-size 512})
(def query "SELECT first_name, last_name, state, zip FROM users
WHERE status = 'ACTIVE' ORDER BY zip")
(def event {:size 1000
:type :jdbc
:prefix "users"
:aliases {:zip "ZIP"}
:fields [:first-name :last-name :state :zip]
:query query})
(rowboat.core/produce-extract event config callbacks)
Here, :fields
is the list of columns (or ElasticSearch fields) to be included
in the CSV file(s). They should match the expression names in the SELECT
clause,
up to the conversion between the snake_case
and kebab-case
. The header names in
the CSV files are generated by lookup in the :aliases
map or, if not found there,
by converting the field names to Pascal Case
(this, :first-name
becomes
"First Name").
No more than the specified :size
number of rows will be included in the CSV
file(s) in total. But there may be fewer if no more data exists.
The zipped file will have the name prefix-uuid.zip
, where :prefix
is provided
in the request ("extract" by default) and uuid
is the extract ID generated for
the request (unless provided as :id
in event
). The CSV file(s) inside the
zipped file are named differently, however: prefix-yyyy-MM-dd_HH-mm-ss_GMT.csv
.
This is because the zipped file can be renamed when downloading, but we want
the files inside it to have more human-friendly names.
The same as above, except that config
must specify :elastic
for the
ElasticSearch server's configuration and :index
for the name of the index,
instead of :datasource
and :opts
. Obviously, the :type
of the event
should be set to :elastic
instead of :jdbc
, and :query
should be an
ElasticSearch query instead of a SQL statement. For example:
(def config {:bucket bucket
:aws-config aws-config
:elastic {your-config}
:index "users")
(def query "{\"_source\": [\"name.first\", \"name.last\", \"address.state\", \"address.zip\"],
\"sort\": [{\"address.zip.keyword\": \"desc\"}],
\"query\": {\"query_string\": {\"query": "((created_date:<2023-08-12) AND (created_date:>2021-08-10))}}})"
(def event {:size 1000
:type :jdbc
:prefix "users"
:aliases {:name.first "First Name"
:name.last "Last Name"
:address.state "State"
:address.zip "ZIP"}
:fields [:name.first :name.last :address.state :address.zip]
:query query})
(rowboat.core/produce-extract event config callbacks)
Any multiple values from ElasticSearch will be concatenated in the same CSV column. Of course, any sort of aggregation can be used in SQL statements, as well.
The produce-extract
function returns the AWS S3 URI of the generated zipped file.
Can you improve this documentation? These fine people already did:
Leonid Korogodski & Leo KorogodskiEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close