Stratum's performance-critical paths are implemented in Java using the JDK Vector API (incubator module since JDK 16, available in JDK 21+). Five class files contain all SIMD and analytics code:
The split into five classes is deliberate: JIT compilation quality degrades as class bytecode size grows (see JIT Lessons). Methods serving different execution paths (array vs chunked vs analytics) are isolated to prevent cross-section JIT interference.
The Vector API provides fixed-width SIMD registers as Java objects:
// Species defines vector width (machine-dependent)
static final VectorSpecies<Double> DOUBLE_SPECIES = DoubleVector.SPECIES_PREFERRED;
static final VectorSpecies<Long> LONG_SPECIES = LongVector.SPECIES_PREFERRED;
// On AVX-512: 8 doubles/8 longs per register. On AVX2: 4/4.
// Load from array, compute, store
DoubleVector v = DoubleVector.fromArray(DOUBLE_SPECIES, prices, i);
DoubleVector d = DoubleVector.fromArray(DOUBLE_SPECIES, discounts, i);
DoubleVector product = v.mul(d);
// Predicate → mask → masked accumulation
VectorMask<Double> mask = v.compare(VectorOperators.LT, threshold);
sum = sum.add(product, mask); // only accumulate where mask is true
Key concepts:
v.add(other, mask) — only lanes where mask is truesum.reduceLanes(VectorOperators.ADD) — horizontal sum to scalarThe key optimization is fusing predicate evaluation and aggregation into a single pass. Traditional approaches materialize intermediate arrays (selection vectors), causing extra memory traffic. Stratum evaluates predicates and accumulates results in the same loop iteration. Fused filter+aggregate is an established technique in native SIMD engines (Velox, Intel OAP, many vectorized database prototypes); Stratum's contribution is achieving it via the Java Vector API without native compilation or JNI.
The workhorse method for single-aggregation queries. Accepts up to 4 long predicates and 4 double predicates with one aggregation:
// Simplified structure (actual code handles all pred/agg type combinations)
for (int i = 0; i < length; i += DOUBLE_LANES) {
// 1. Evaluate long predicates → combined mask
LongVector lv0 = LongVector.fromArray(LONG_SPECIES, longCol0, i);
VectorMask<Long> lm = lv0.compare(GTE, longLo0).and(lv0.compare(LT, longHi0));
// ... AND additional long predicates
// 2. Convert long mask to double mask
VectorMask<Double> mask = VectorMask.fromLong(DOUBLE_SPECIES, lm.toLong());
// 3. Evaluate double predicates
DoubleVector dv0 = DoubleVector.fromArray(DOUBLE_SPECIES, dblCol0, i);
mask = mask.and(dv0.compare(GTE, dblLo0)).and(dv0.compare(LT, dblHi0));
// ... AND additional double predicates
// 4. Masked aggregation
DoubleVector agg = DoubleVector.fromArray(DOUBLE_SPECIES, aggCol, i);
sum = sum.add(agg, mask);
count += Long.bitCount(mask.toLong());
}
Predicate types are encoded as integer constants: PRED_RANGE=0, PRED_LT=1, PRED_GT=2, PRED_EQ=3, PRED_LTE=4, PRED_GTE=5, PRED_NEQ=6, PRED_NOT_RANGE=7. The method contains unrolled code for each predicate count (0-4 long, 0-4 double) to avoid switches in the hot loop.
Aggregation types: AGG_SUM_PRODUCT=0, AGG_SUM=1, AGG_COUNT=2, AGG_MIN=3, AGG_MAX=4.
A critical pattern: fusedSimdUnrolled(...) delegates to fusedSimdUnrolledRange(0, length). This ensures the JIT compiles a single method body that serves both the full-array and range-based entry points. Without this, JIT compiles them independently — warming one doesn't benefit the other (measured: 21ms → 3.2ms).
Stratum uses morsel-driven execution for parallel operations:
ForkJoinPool (Runtime.availableProcessors() threads)
│
├── Thread 0: morsels [0..64K), [N*64K..(N+1)*64K), ...
├── Thread 1: morsels [64K..128K), ...
├── Thread 2: ...
└── Thread N-1: ...
Each morsel is 64K elements (~512KB for doubles), fitting comfortably in L2 cache (~1-2MB). The alternative — one large range per thread — causes cache thrashing when 8 threads each process 750K elements (24MB scattered across DRAM).
Implementation: N futures submitted to ForkJoinPool, each with an internal morsel loop. This avoids FJP overhead from submitting one future per morsel (which would be 96 futures for 6M rows).
Thread-local results are merged after all threads complete: double[2] addition for scalar aggregations, double[maxKey * accSize] element-wise merge for group-by.
For low-cardinality groups (≤200K distinct keys), Stratum uses direct array indexing instead of hash tables:
// Flat accumulator array: maxKey * accSize (e.g., 2 for SUM: [sum, count])
double[] accs = new double[maxKey * accSize];
for (int i = start; i < end; i++) {
if (evaluatePredicates(i, ...)) {
int key = (int) groupCol[i];
int base = key * accSize;
// Branchless accumulation
accs[base] += aggCol[i]; // sum
accs[base + 1] += 1.0; // count
}
}
Key optimizations:
double[maxKey * accSize] instead of double[maxKey][] — eliminates pointer chasing, better cache utilizationaccs[base] += val * matchBit eliminates branch misprediction at ~50% selectivitymaxKey * accSize * 8 < 64KB, use morsels (data locality); otherwise single call per thread (avoids massive per-morsel allocation)Separated into fusedFilterGroupCountDenseRange for JIT isolation. Uses inline SIMD predicate evaluation and bit-manipulation mask extraction (toLong() + numberOfTrailingZeros() + bits &= bits-1) instead of boolean array intermediaries. Thread-local long[] counts merged at end.
For high-cardinality groups (>200K), Stratum uses an open-addressed hash table with Fibonacci hashing:
// Hash table: flat long[] keys + double[] accumulators
long[] htKeys = new long[capacity]; // EMPTY_KEY sentinel
double[] htAccs = new double[capacity * accSize];
long hash = key * 0x9E3779B97F4A7C15L;
int slot = (int)(hash >>> shift);
// Linear probing with wrap-around
For parallel execution, hash group-by uses radix partitioning (256 partitions):
groupAggregateHashRange (already JIT-compiled from other paths)Each partition's hash table is ~768KB (fits L2 cache), versus a single 344MB table. A different hash constant is used for partitioning (0x517CC1B727220A95L) versus the per-partition hash table to avoid catastrophic collision chains.
fusedSimdMultiSumParallel processes up to 4 SUM/SUM_PRODUCT/COUNT/AVG aggregations in a single pass. The same predicate mask is applied to all accumulators.
When all aggregation columns are long[], the fusedSimdMultiSumAllLongParallel path (in ColumnOpsExt) accumulates using LongVector.add() instead of converting to double first. This avoids two full-array longToDouble conversion passes. Conversion to double happens only at reduceLanes().
fusedFilterGroupAggregateDenseVarParallel (ColumnOpsExt) uses variable-width accumulators:
[sum, sum_sq, count][sum_x, sum_y, sum_xy, sum_xx, sum_yy, count]All slots are additive, enabling simple parallel merge. Final computation (Welford formula, Pearson coefficient) happens in Clojure decode.
For PersistentColumnIndex inputs, fusedGroupAggregateDenseChunkedParallel streams over chunks without materializing the full array. Each 8192-element chunk is copied from native memory into a temporary 64KB array (L2-resident), scattered into accumulators, then discarded. This is 2.5x faster than the array path for compound aggregations.
arrayStringLikeFast (ColumnOpsExt) recognizes three common LIKE patterns and avoids full regex:
%literal% → String.contains()prefix% → String.startsWith()%suffix → String.endsWith()For dictionary-encoded columns with >100K entries, the dictionary is filtered in parallel, then a bitset of matching codes is checked per row.
Hard-won rules from extensive benchmarking:
Class size matters: Adding bytecode to a class file degrades JIT quality of all methods in that class, even unrelated ones. Keep classes under ~80KB bytecode. ColumnOpsExt exists for this reason.
Avoid switches in hot loops: Inner loops with switch(aggType) prevent SIMD vectorization (106ms vs 16ms). Use separate methods for each aggregation type.
Object[][] kills specialization: JIT can't type-specialize through Object[][] — every (long[]) arr[p][c] cast prevents bounds-check hoisting. Use typed arrays (long[][][], double[][][]).
LongVector.convertShape is not intrinsified: convertShape(L2D, ...) caused 8x regression. Convert at reduceLanes only.
Shared compilation units: If foo() and fooRange(start, end) exist, JIT compiles them independently. Make foo() delegate to fooRange(0, length).
Separate JIT profiles: Different aggregation types (COUNT vs SUM) need separate methods. Mixing them causes UnreachedCode speculation failures and deoptimization cascades.
Method size limits: Keep hot methods compact. Code bloat from unrolling (e.g., 8 predicates) triggers deoptimization.
Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |