A major concern of SparkPlug is reliable and efficient serialization for Spark programs written in Clojure.
Under the umbrella of serialization, there are two separate problems: task functions, and task results.
These are the functions you pass to RDD transformations, like map and filter. When you invoke an action on the resulting RDD, the driver will serialize these functions and broadcast them to executors. Executors must be able to deserialize the functions and run them across multiple threads.
Due to challenges of serializing functions, Spark uses built-in Java serialization for task functions. The main difficulty with Clojure functions is that they have implicit dependencies on namespaces and Vars being available at runtime. If Clojure functions are not serialized correctly, your application is bound to crash with confusing errors like "attempting to call unbound fn". To address this, SparkPlug takes this approach:
clojure.core/require
is not thread-safe! Without synchronization, it's likely to result in
non-deterministic "unbound fn" and "unbound Var" errors.This refers to the data produced by executing tasks. Executors will either send results back to the driver (as in a "collect" action), or pass them on to the next stage for executors to read again.
For task results of Clojure data, such as keywords, maps, and vectors,
Java serialization with java.io.Serializable
is very suboptimal.
For example, the keyword :a
gets encoded to a whopping 218 bytes, and
the empty vector []
becomes 405 bytes!
SparkPlug solves this using Spark's support for Kryo serialization, by defining custom serializers and a registrator to handle common Clojure data types. To use SparkPlug's Kryo serialization, set these Spark properties:
Property | Value |
---|---|
spark.serializer | org.apache.spark.serializer.KryoSerializer |
spark.kryo.registrator | sparkplug.kryo.ClassPathRegistrator |
For convencience, SparkPlug's configuration builder functions include these properties by default.
The registrator is also extensible, so that applications can easily add more serializers and have them included in the registry. See the sparkplug.kryo namespace for details.
Since task functions are serialized with java.io.Serializable
, any Clojure
data closed over by a task function is also serialized this way. If you need
to close over a relatively large piece of Clojure data in a task function, such
as a static lookup table, using a broadcast variable will provide much better
performance because it will use the same serialization path as task results.
If you are caching RDDs of Clojure data, consider using a serialized storage level. This will use Kryo serialization, and will save a lot of memory on executors. The tradeoff is that this increases CPU time to access the data.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close