Clojure API for a more dynamic Google Cloud Dataflow and (not really battle tested) any other Apache Beam backend.
You can also see ports of the official Dataflow examples in the
datasplash.examples
namespace.
Here is the classic word count example.
:information_source: You will need to run (compile 'datasplash.examples)
every time you make a change.
(ns datasplash.examples
(:require [clojure.string :as str]
[datasplash.api :as ds]
[datasplash.options :refer [defoptions]])
(:gen-class))
(defn tokenize
[^String l]
(remove empty? (.split (str/trim l) "[^a-zA-Z']+")))
(defn count-words
[p]
(ds/->> :count-words p
(ds/mapcat tokenize {:name :tokenize})
(ds/frequencies)))
(defn format-count
[[k v]]
(format "%s: %d" k v))
(defoptions WordCountOptions
{:input {:default "gs://dataflow-samples/shakespeare/kinglear.txt"
:type String}
:output {:default "kinglear-freqs.txt" :type String}
:numShards {:default 0 :type Long}})
(defn -main
[& str-args]
(let [p (ds/make-pipeline WordCountOptions str-args)
{:keys [input output numShards]} (ds/get-pipeline-options p)]
(->> p
(ds/read-text-file input {:name "King-Lear"})
(count-words)
(ds/map format-count {:name :format-count})
(ds/write-text-file output {:num-shards numShards})
(ds/run-pipeline))))
Locally on your machine using a DirectRunner:
(in-ns 'datasplash.examples)
(clojure.core/compile 'datasplash.examples)
(-main "--input=sometext.txt" "--output=out-freq.txt" "--numShards=1")
Remotely on Google Cloud using a DataflowRunner:
You should have properly configured your Google Cloud account and Dataflow access from your machine.
(in-ns 'datasplash.examples)
(clojure.core/compile 'datasplash.examples)
(-main "--project=my-project"
"--runner=DataflowRunner"
"--gcpTempLocation=gs://bucket/tmp"
"--input=gs://apache-beam-samples/shakespeare/kinglear.txt"
"--output=gs://bucket/outputs/kinglear-freq.txt"
"--numShards=1")
Datasplash needs to be AOT compiled, so you should prepare an uberjar and run from your main entry like so:
java -jar my-dataflow-job-uber.jar [beam-args]
require
.ParDo
objects (you shouldn't), wrap
all your code in the safe-exec
macro to avoid issues with unbound vars. Any
good idea about finding a better way to do this would be greatly appreciated!UserCodeException
as seen in the cloud UI are mangled and
missing the relevant part of the Clojure source code, this is due to a bug
with the way the sdk mangles stacktraces in Clojure. In this case look for
ClojureRuntimeException in the logs to find the original unaltered
stacktrace.proxy
results are not Serializable
anymore,
so you cannot use anywhere in your pipeline Clojure code that uses proxy. Use
Java shim for these objects instead.java.lang.ClassNotFoundException: Options
you
probably forgot to compile your namespace.(require)
because it's not thread safe.
See [this issue] for a workaround.java.io.IOException: No such file or directory
when invoking
compile
, make sure there is a directory in your project root that matches
the value of *compile-path*
(default classes
).The Beam Java SDK does not pull in the Zstd library by default, so it is the
user's responsibility to declare an explicit dependency on zstd-jni
. Attempts
to read or write .zst files without this library loaded will result in
NoClassDefFoundError
at runtime.
The Beam Java SDK does not pull in the required libraries for LZOP compression
by default, so it is the user's responsibility to declare an explicit
dependency on io.airlift:aircompressor
and
com.facebook.presto.hadoop:hadoop-apache2
. Attempts to read or write .lzo
files without those libraries loaded will result in a NoClassDefFoundError
at runtime.
See Apache Beam Compression enum for details.
Copyright © 2015-2024 Oscaro.com
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.
Can you improve this documentation? These fine people already did:
Nils Grunwald, Baptiste Fontaine, Ed Porras, Nils, kawas44, Michael Laroche, Jason, Nils Grünwald, Eric Fode, Amar Mehta, Quân & Jérôme PrudentEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close