This Clojure library can be used to extract data from a SQL database or an ElasticSearch index into AWS S3 storage. The following steps are performed:
Notably, the library does it by streaming the data through all the steps, so that:
One doesn't wait for any of the steps to be complete in order to begin the next one.
The data flows like water in a river, oin a steady stream and without overflowing
the banks—that is to say, without piling up in memory at any point. Hence, the
rowboat name.
This is accomplished by calling Akka Streams from Clojure. This Scala library can be configured to stream data through any graph of transformations (at each of the graph's nodes), even a graph with loops, while handling backpressure. At no producer-to-consumer edge of the graph does the producer provide more data than the consumer has the capacity to process at any given time.
Two Akka graphs are used for this purpose, connected by Clojure channels—as if
a wormhole linking two separate universes. This is because the zipping node
requires a Source of Sources (for technical details, including an ASCII
drawing of the graphs and the wormhole, see the extensive comments in the code
itself, especially in the rowboat.core namespace).
Thus, Clojure and Scala help each other to perform the streaming task.
;; AWS configuration.
(def region "us-east-1") ;; or your AWS region
(def access-key-id nil) ;; or your AWS access key ID
(def secret-key nil) ;; or your AWS secret key
(def profile "saml") ;; or your AWS profile (only used if the keys are nil)
(def bucket "my-bucket") ;; or your AWS S3 bucket's name
(def aws-config {:profile profile :region region})
;; Database configuration.
(def db-username "my-username") ;; or my database user
(def db-password "my-password") ;; or my database password
(def db-url "jdbc:postgresql://my-server/my-database") ;; PostgreSQL as an example
(def db-config {:url db-url
:adapter "postgresql"
:username db-username
:password db-password
:read-only true ;; We will only read the data, after all.
:auto-commit false ;; If true, it will be overriden to false, anyway
})
(def datasource (hikari-cp.core/make-datasource db-config))
;; Callback functions to signal the various lifecycle changes,
;; as well as to perform notifications (email, for example) when
;; ready, as well as any bookkeeping operations.
(def on-start
(fn [{:keys [id]}]
(println "Processing" id)))
(def on-success
(fn [{:keys [id size filename]}]
;; This would also be a good time to notify the requesting user
;; that the file is available for download.
(println "Successful" id "Size:" size "File:" filename)))
(def on-failure
(fn [{:keys [id error]}]
(println "Failed" id "Error:" error)))
(def on-empty
(fn [{:keys [id]}]
(println "Empty" id)))
(def callbacks {:on-start on-start
:on-success on-success
:on-failure on-failure
:on-empty on-empty})
;; Overall config and execution.
(def config {:bucket bucket
:aws-config aws-config
:datasource datasource
;; fetch-size is the size of block of rows
;; fetched each time (1024 by default).
:opts {:fetch-size 512})
(def query "SELECT first_name, last_name, state, zip FROM users
WHERE status = 'ACTIVE' ORDER BY zip")
(def event {:size 1000
:type :jdbc
:prefix "users"
:aliases {:zip "ZIP"}
:fields [:first-name :last-name :state :zip]
:query query})
(rowboat.core/produce-extract event config callbacks)
Here, :fields is the list of columns (or ElasticSearch fields) to be included
in the CSV file(s). They should match the expression names in the SELECT clause,
up to the conversion between the snake_case and kebab-case. The header names in
the CSV files are generated by lookup in the :aliases map or, if not found there,
by converting the field names to Pascal Case (this, :first-name becomes
"First Name").
No more than the specified :size number of rows will be included in the CSV
file(s) in total. But there may be fewer if no more data exists.
The zipped file will have the name prefix-uuid.zip, where :prefix is provided
in the request ("extract" by default) and uuid is the extract ID generated for
the request (unless provided as :id in event). The CSV file(s) inside the
zipped file are named differently, however: prefix-yyyy-MM-dd_HH-mm-ss_GMT.csv.
This is because the zipped file can be renamed when downloading, but we want
the files inside it to have more human-friendly names.
The same as above, except that config must specify :elastic for the
ElasticSearch server's configuration and :index for the name of the index,
instead of :datasource and :opts. Obviously, the :type of the event
should be set to :elastic instead of :jdbc, and :query should be an
ElasticSearch query instead of a SQL statement. For example:
(def config {:bucket bucket
:aws-config aws-config
:elastic {your-config}
:index "users")
(def query "{\"_source\": [\"name.first\", \"name.last\", \"address.state\", \"address.zip\"],
\"sort\": [{\"address.zip.keyword\": \"desc\"}],
\"query\": {\"query_string\": {\"query": "((created_date:<2023-08-12) AND (created_date:>2021-08-10))}}})"
(def event {:size 1000
:type :jdbc
:prefix "users"
:aliases {:name.first "First Name"
:name.last "Last Name"
:address.state "State"
:address.zip "ZIP"}
:fields [:name.first :name.last :address.state :address.zip]
:query query})
(rowboat.core/produce-extract event config callbacks)
Any multiple values from ElasticSearch will be concatenated in the same CSV column. Of course, any sort of aggregation can be used in SQL statements, as well.
The produce-extract function returns the AWS S3 URI of the generated zipped file.
Can you improve this documentation? These fine people already did:
Leonid Korogodski & Leo KorogodskiEdit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |