This section contains a more complete description of the underlying system, attempting
to provide the user with an idea on how the system can be used.
All tests in the package are run using leiningen, in the test/onyx_java/test directory.
This directory contains a namespace called pure_java.clj - all of the tests in this namespace
do nothing but wrap behavior contained in the test/java/onyxplatform/test directory (the onyxplatform.test package).
Opening the clj file shows several unit tests, covering all behaviors in the org.onyxplatform.api.java.API class.
Each test follows the same essential pattern - set up, run, and tear down.
One of the tests here is called single-java-test:
(deftest single-java-test
(let [testObject (SingleJavaTest. "onyx-env.edn")
inputs [{:pass-through "PASSTHROUGH"}]
expected {:out [{:pass-through "PASSTHROUGH"} :done]}
outputs (.runJobCollectOutputs testObject [{:pass-through "PASSTHROUGH"}])]
(.shutdown testObject)
(is (= (first inputs) (first (:out outputs))))))
package onyxplatform.test;
import clojure.java.api.Clojure;
import clojure.lang.IFn;
import clojure.lang.IPersistentMap;
import org.onyxplatform.api.java.Catalog;
import org.onyxplatform.api.java.Task;
import org.onyxplatform.api.java.utils.MapFns;
import org.onyxplatform.api.java.instance.BindUtils;
/**
* SingleJavaTest tests a Job running with a single Java function, built
* using the dynamic class loader. All basic behavior is set up using the
* JobBuilder base class, while the pure Java object instance function is
* added within this method itself.
*/
public class SingleJavaTest extends JobBuilder {
/**
* Constructs a simple Job test that can run a pure java function, loaded
* from an EDN file which is passed as the only parameter.
* @param onyxEnvConfig path to the EDN file specifying how to set up the job
*/
public SingleJavaTest(String onyxEnvConfig) {
super(onyxEnvConfig, 5, 50);
}
/**
* Adds an Object instance of the test function to the Job catalog
*/
public void configureCatalog() {
Catalog c = job.getCatalog();
BindUtils.addFn(c, "pass",
batchSize(), batchTimeout(),
"onyxplatform.test.PassFn", MapFns.emptyMap());
}
}
{
:generateTenancyId true
:peerEdn "dev-peer-config.edn"
:envEdn "env-config.edn"
:virtualPeerCount 3
:taskScheduler "onyx.task-scheduler/balanced"
}
/**
* Constructs a JobBuilder using an EDN configuration, a batchSize, and
* a batchTimeout.
* @param onyxEnvConfig path to the EDN file containing the job set up spec
* @param batchSize integer representing the number of segments tasks should consume at once
* @param batchTimeout integer representing the maximum time (ms) a task should wait before beginning
*/
public JobBuilder(String onyxEnvConfig, int batchSize, int batchTimeout) {
onyxEnv = new OnyxEnv("onyx-env.edn", true);
this.batchSize = new Integer(batchSize);
this.batchTimeout = new Integer(batchTimeout);
job = createBaseJob();
}
/**
* Adds asynchronous input/output channels and the appropriate test edges
* for test functions
* @return the created Job
*/
public Job createBaseJob() {
// Tests have a simple 1-fn core async backed
// workflow that share all bootstrapping with
// other tests. Generates all job entries excepting
// the actual fn catalog entity.
//
job = new Job(onyxEnv.taskScheduler());
job.addWorkflowEdge("in", "pass");
job.addWorkflowEdge("pass", "out");
Catalog c = job.getCatalog();
AsyncCatalog.addInput(c, "in", batchSize, batchTimeout);
AsyncCatalog.addOutput(c, "out", batchSize, batchTimeout);
Lifecycles lc = job.getLifecycles();
AsyncLifecycles.addInput(lc, "in");
AsyncLifecycles.addOutput(lc, "out");
return job;
}
/**
* Abstract method must be extended by extending subclass to add other
* catalog entries to the JobBuilder catalog
*/
public abstract void configureCatalog();
package onyxplatform.test;
import clojure.lang.IPersistentMap;
import org.onyxplatform.api.java.instance.OnyxFn;
/**
* PassFn is a simple test class extending OnyxFn which is used to test
* a pure java object instance task in an Onyx Job.
*/
public class PassFn extends OnyxFn {
/**
* Constructor overriding and calling the super constructor of OnyxFn.
* This must exist for every user Class which uses OnyxFn.
* @param m An IPersistentMap of constructor arguments
*/
public PassFn(IPersistentMap m) {
super(m);
}
/**
* Extended abstract method from OnyxFn required by every subclass of OnyxFn.
* In this case, consumeSegment simply returns the map that's passed in -
* in the general use case, this method can do anything with the segment,
* as long as it always returns an IPersistentMap or PersistentVector of
* IPersistentMaps.
* @param m The IPersistentMap containing the input segment
* @return an IPersistentMap or PersistentVector of IPersistentMaps containing method output
*/
public Object consumeSegment(IPersistentMap m) {
return m;
}
}
Notice that this test class extends OnyxFn, which is required when using pure Java objects with Onyx-Java.
Also notice the other requirements for user classes used as tasks -
/**
* Runs a job and collects the outputs, returning them inside an IPersistentMap.
* @param inputs A PersistentVector of input segments to use for running the Job
* @return An IPersistentMap containing the outputs produced by the job
*/
public IPersistentMap runJobCollectOutputs(PersistentVector inputs) {
IPersistentMap jmeta = runJob(inputs);
return AsyncLifecycles.collectOutputs(job.getLifecycles(), "out");
}
/**
* Completely shuts down the environment associated with the JobBuilder
*/
public void shutdown() {
onyxEnv.stopEnv();
}
Copyright © 2016 Distributed Masonry
Distributed under the Eclipse Public License either version 1.0 or (at
your option) any later version.
Can you improve this documentation?Edit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close