Liking cljdoc? Tell your friends :D

rowboat

This Clojure library can be used to extract data from a SQL database or an ElasticSearch index into AWS S3 storage. The following steps are performed:

  1. Extract data from the database or ElasticSearch.
  2. Chunk the data into portions of no more than 1,000,000 rows (the Excel limit).
    1. Note: The chunk size is configurable.
  3. Format the data into the CSV format.
  4. Zip the generated CSV file(s).
  5. Upload the zipped file to AWS S3.

Notably, the library does it by streaming the data through all the steps, so that:

  • Chunking begins as soon as the first rows become available.
  • CSV formatting begins as soon as the first chunk appears.
  • Zipping begins as soon as the first CSV rows are produced.
  • Uploading begins as soon as the first zipped data becomes available.

One doesn't wait for any of the steps to be complete in order to begin the next one. The data flows like water in a river, oin a steady stream and without overflowing the banks—that is to say, without piling up in memory at any point. Hence, the rowboat name.

Rowboat Library

This is accomplished by calling Akka Streams from Clojure. This Scala library can be configured to stream data through any graph of transformations (at each of the graph's nodes), even a graph with loops, while handling backpressure. At no producer-to-consumer edge of the graph does the producer provide more data than the consumer has the capacity to process at any given time.

Two Akka graphs are used for this purpose, connected by Clojure channels—as if a wormhole linking two separate universes. This is because the zipping node requires a Source of Sources (for technical details, including an ASCII drawing of the graphs and the wormhole, see the extensive comments in the code itself, especially in the rowboat.core namespace).

Thus, Clojure and Scala help each other to perform the streaming task.

Usage: SQL Database

;; AWS configuration.

(def region "us-east-1") ;; or your AWS region
(def access-key-id nil)  ;; or your AWS access key ID
(def secret-key nil)     ;; or your AWS secret key
(def profile "saml")     ;; or your AWS profile (only used if the keys are nil)
(def bucket "my-bucket") ;; or your AWS S3 bucket's name

(def aws-config {:profile profile :region region})

;; Database configuration.

(def db-username "my-username") ;; or my database user
(def db-password "my-password") ;; or my database password
(def db-url "jdbc:postgresql://my-server/my-database") ;; PostgreSQL as an example

(def db-config {:url         db-url
                :adapter     "postgresql"
                :username    db-username
                :password    db-password
                :read-only   true  ;; We will only read the data, after all.
                :auto-commit false ;; If true, it will be overriden to false, anyway
                })

(def datasource (hikari-cp.core/make-datasource db-config))

;; Callback functions to signal the various lifecycle changes,
;; as well as to perform notifications (email, for example) when
;; ready, as well as any bookkeeping operations.

(def on-start
  (fn [{:keys [id]}]
    (println "Processing" id)))
(def on-success
  (fn [{:keys [id size filename]}]
    ;; This would also be a good time to notify the requesting user
    ;; that the file is available for download.
    (println "Successful" id "Size:" size "File:" filename)))
(def on-failure
  (fn [{:keys [id error]}]
    (println "Failed" id "Error:" error)))
(def on-empty
  (fn [{:keys [id]}]
    (println "Empty" id)))

(def callbacks {:on-start   on-start
                :on-success on-success
                :on-failure on-failure
                :on-empty   on-empty})

;; Overall config and execution.

(def config {:bucket     bucket
             :aws-config aws-config
             :datasource datasource
             ;; fetch-size is the size of block of rows
             ;; fetched each time (1024 by default).
             :opts       {:fetch-size 512})

(def query "SELECT first_name, last_name, state, zip FROM users
            WHERE status = 'ACTIVE' ORDER BY zip")

(def event {:size    1000
            :type    :jdbc
            :prefix  "users"
            :aliases {:zip "ZIP"}
            :fields  [:first-name :last-name :state :zip]
            :query   query})

(rowboat.core/produce-extract event config callbacks)

Here, :fields is the list of columns (or ElasticSearch fields) to be included in the CSV file(s). They should match the expression names in the SELECT clause, up to the conversion between the snake_case and kebab-case. The header names in the CSV files are generated by lookup in the :aliases map or, if not found there, by converting the field names to Pascal Case (this, :first-name becomes "First Name").

No more than the specified :size number of rows will be included in the CSV file(s) in total. But there may be fewer if no more data exists.

The zipped file will have the name prefix-uuid.zip, where :prefix is provided in the request ("extract" by default) and uuid is the extract ID generated for the request (unless provided as :id in event). The CSV file(s) inside the zipped file are named differently, however: prefix-yyyy-MM-dd_HH-mm-ss_GMT.csv. This is because the zipped file can be renamed when downloading, but we want the files inside it to have more human-friendly names.

Usage: ElasticSearch

The same as above, except that config must specify :elastic for the ElasticSearch server's configuration and :index for the name of the index, instead of :datasource and :opts. Obviously, the :type of the event should be set to :elastic instead of :jdbc, and :query should be an ElasticSearch query instead of a SQL statement. For example:

(def config {:bucket     bucket
             :aws-config aws-config
             :elastic    {your-config}
             :index      "users")

(def query "{\"_source\": [\"name.first\", \"name.last\", \"address.state\", \"address.zip\"],
             \"sort\": [{\"address.zip.keyword\": \"desc\"}],
             \"query\": {\"query_string\": {\"query": "((created_date:<2023-08-12) AND (created_date:>2021-08-10))}}})"

(def event {:size    1000
            :type    :jdbc
            :prefix  "users"
            :aliases {:name.first    "First Name"
                      :name.last     "Last Name"
                      :address.state "State"
                      :address.zip   "ZIP"}
            :fields  [:name.first :name.last :address.state :address.zip]
            :query   query})

(rowboat.core/produce-extract event config callbacks)

Any multiple values from ElasticSearch will be concatenated in the same CSV column. Of course, any sort of aggregation can be used in SQL statements, as well.

The produce-extract function returns the AWS S3 URI of the generated zipped file.

Can you improve this documentation? These fine people already did:
Leonid Korogodski & Leo Korogodski
Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close