Liking cljdoc? Tell your friends :D

clojure-data-grinder

The clojure-data-grinder, as the name might indicate, is built to process and "grind" data. Receive data from anywhere, process it, and store it however and wherever you want.

Usage

Summary

The clojure-data-grinder uses .edn files as the main way to configure a pipeline.

A simple pipeline may be composed by 4 things:

  • A source;
  • A grinder;
  • An enhancer;
  • A sink;
  • A job

The source is part of the pipeline that retrieves the data to be processed; The grinder is responsible for processing the data and output the data in the format that is required; The enhancer is used to enrich processed or collected data with cached data from another source, this source might be a DB, a file, etc; The sink is responsible for outputting the processed data, be it a DB or a cloud platform, etc; The job is responsible for running at a specific scheduled point in time, and it can perform whatever it is needed out of it, like loading data from a database and outputting, refreshing cached data, etc.

It is not mandatory although, to have a grinder, you can easily save the data from the source directly with a sink, if that is what is needed for that specific data source.

Configuration

{:api-server #profile {:default {:port 8080}
                       :staging {}
                       :production {}}
 :steps #profile {:default {:channels [{:name "line-interpreter"
                                        :buffer-size 10}
                                       {:name "line-enricher"
                                        :buffer-size 10}
                                       {:name "line-sink"
                                        :buffer-size 10}]
                            :sources [{:name "file-reader"
                                       :conf {:watch-dir "/mnt/c/Users/iCeMan/Desktop/input"
                                              :channels {:out [{:output-name :out-channel
                                                                :output-channel "line-interpreter"}]}}
                                       :v-fn (fn [conf]
                                               (when (nil? (-> conf :channels :out :out-channel))
                                                 {:data "No out channel name found!"}))
                                       :type clojure-data-grinder-core.core/map->FileWatcherSource
                                       :threads 1}]
                            :grinders [{:name "line-interpreter"
                                        :conf {:channels {:in "line-interpreter"
                                                          :out [{:output-name :out-channel
                                                                 :output-channel "line-enricher"}]}}
                                        :x-fn cdg-test.core/interpret-file
                                        :poll-frequency-ms 3000
                                        :threads 1}]
                            :enrichers [{:name "file-cacher"
                                         :conf {:channels {:in "line-enricher"
                                                           :out [{:output-name :out-channel
                                                                  :output-channel "line-sink"}]}}
                                         :x-fn (fn [c v]
                                                 (assoc v :a (:a c)))
                                         :poll-frequency-ms 2000
                                         :c-fn (fn [c]
                                                 {:a 2 :b 3})
                                         :cache-poll-frequency-ms 10000}]
                            :sinks [{:name "line-sink"
                                     :conf {:channels {:in "line-sink"}}
                                     :x-fn (fn [v]
                                             (clojure.tools.logging/info "Value sunk!" v))
                                     :poll-frequency-ms 1000
                                     :threads 1}]
                            :jobs [{:name "dummy-job"
                                    :conf {:schedule "/20 * * * * * *"
                                           :channels {:out [{:output-name :out-channel
                                                             :output-channel "line-sink"}]}}
                                    :x-fn (fn []
                                            "dummy job is running")}]}
                  :staging {}
                  :production {}}}

The current version of the clojure-data-grinder, contains 2 main configuration points:

  • API Server;
  • Steps;

API Server

The api server contains all the information for the API part of the clojure-data-grinder. This provides the user information about how many errors have occurred on specific steps, or a general view of the entire pipeline.

Its configuration for the moment, is composed only of the port where the server is going to be running.

Steps

The steps contain everything related to the pipeline, i.e. everything from the pipes (channels) to the steps themselves. As mentioned above, there are multiple types of steps that can be used in the configuration. These need to be added to the respective vector inside the configuration. Therefore we will end up with:

:sources [] - for all the sources used in the pipeline;

:grinders [] - for all the grinders used in the pipeline;

:enrichers [] - for all the enrichers used in the pipeline;

:sinks [] - for all the sinks used in the pipeline;

:jobs [] - for all the jobs used in the pipeline;

each type has common arguments with all other steps, as well as custom ones, that are only used in that specific step.

Conf

Each step has an argument called conf. This is the internal configuration of each step. Each step has input/output channels, or only one of those, depending on the step type. The channel configuration, i.e. the input channel name, and output channel names/reference keys, are setup in the conf value as well. The reason behind this, is to make it possible to customize the behaviour of the steps, making it possible to output to different channels depending on the value ingested and so on.

 :conf {:channels {:in "line-interpreter"
                   :out [{:output-name :out-channel
                          :output-channel "line-enricher"}]}}

In the example above, the conf value has an input channel called line-interpreter and a single output channel. The output channel is line-enricher and can be specified within the code of the step by the key :out-channel.

Sources Configuration

Each source has the following arguments:

  • :name - the name of the step;
  • :conf - a common map that contains the configuration needed for this specific source;
  • :v-fn - validation function, the function to be used to validate the configuration provided above;
  • :type - this is used in case we have a predefined source type (an external library for instance) that was previously implemented. If this is not present, then it will try to check if a custom function(x-fn) is present to use as a source.
  • :poll-frequency-ms - the poll frequency for the source in milliseconds. How long should it wait to run the source function again.
  • :threads - the number of threads used to run this source (default is 1).
  • :x-fn - in case type is not present, this will contain the function used as the source. This will only be used if :type IS NOT PRESENT!

Grinders Configuration

Each Grinder has the following arguments

  • :name - the name of the step;
  • :conf - a common map that contains the configuration needed for this specific grinder;
  • :type - this is used in case we have a predefined grinder type (an external library for instance) that was previously implemented. If this is not present, then it will try to check if a custom function(x-fn) is present to use as the transformation function.
  • :v-fn - validation function, the function to be used to validate the configuration provided above;
  • :x-fn - the transformation function responsible for grinding the data input;
  • :poll-frequency-ms - the poll frequency for the grinder in milliseconds. How long should it wait to read from the input channel again.
  • :threads - the number of threads used to run this source (default is 1).

Enrichers Configuration

Each Enricher has the following arguments

  • :name - the name of the step;
  • :conf - a common map that contains the configuration needed for this specific grinder;
  • :type - this is used in case we have a predefined enricher type (an external library for instance) that was previously implemented. If this is not present, then it will try to check if a custom function(x-fn) is present to use as the mixing function.
  • :v-fn - validation function, the function to be used to validate the configuration provided above;
  • :x-fn - the transformation function responsible for mixing cache into the data input;
  • :poll-frequency-ms - the poll frequency for the grinder in milliseconds. How long should it wait to read from the input channel again.
  • :c-fn - the cache function, the function that is ran everytime the cache is updated
  • :cache-poll-frequency-ms - the pool frequency in milliseconds at which the cache is refreshed.
  • :threads - the number of threads used to run this source (default is 1).

Sinks Configuration

Each Sink has the following arguments

  • :name - the name of the step;
  • :conf - a common map that contains the configuration needed for this specific sink;
  • :type - this is used in case we have a predefined sink type (an external library for instance) that was previously implemented. If this is not present, then it will try to check if a custom function(x-fn) is present to use as the sink function.
  • :v-fn - validation function, the function to be used to validate the configuration provided above;
  • :x-fn - the transformation function responsible for sinking the data input;
  • :poll-frequency-ms - the poll frequency for the sink in milliseconds. How long should it wait to read from the input channel again.
  • :threads - the number of threads used to run this source (default is 1).

Jobs Configuration

Each job has the following arguments

  • :name - the name of the step;
  • :conf - a common map that contains the configuration needed for this specific job;
  • :type - this is used in case we have a predefined sink type (an external library for instance) that was previously implemented. If this is not present, then it will try to check if a custom function(x-fn) is present to use as the sink function.
  • :v-fn - validation function, the function to be used to validate the configuration provided above;
  • :x-fn - the transformation function responsible for retrieving whatever data will be outputted;

Can you improve this documentation? These fine people already did:
iCeMan & Fábio Dias Francisco
Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close