Differential dataflow for XTDB - incrementally maintain query results without re-execution.
XTFlow is a differential dataflow engine for XTDB that incrementally updates query results as data changes. Instead of re-running queries on every transaction, XTFlow propagates changes (deltas) through an operator graph to compute precise updates.
Key Features:
Add to deps.edn:
Via Git:
{:deps {org.kipz/xtflow {:git/url "https://github.com/kipz/xtflow"
:git/sha "..."}}}
Via Maven/Clojars:
{:deps {org.kipz/xtflow {:mvn/version "x.y.z"}}}
Register a query with callback:
(require '[xtflow.core :as flow]
'[xtdb.api :as xt])
(def xtdb-client (xt/start-node {}))
(flow/register-query!
{:query-id "my-query"
:xtql "(-> (from :users [name age])
(where (> age 18))
(aggregate {:count (row-count)}))"
:callback (fn [changes]
(println "Total adults:"
(-> changes :modified first :new :count)))})
Execute transactions - callbacks fire automatically:
(flow/execute-tx! xtdb-client
[[:put-docs :users
{:xt/id "user-1" :name "Alice" :age 25}]])
;; Output: Total adults: 1
XTFlow uses differential dataflow:
XTFlow supports 16 of 17 XTDB v2 XTQL operators (94% coverage):
from - Sources data from a table with field projectionrel - Sources data from inline relation literalsunify - Combines multiple input sources using Datalog-style unificationwhere - Filters rows using predicates (=, >, <, >=, <=, like, and, or, not)with - Adds computed fields with deep nested access (..)without - Removes specified fields from documentsreturn - Projects only specified fieldsunnest - Flattens arrays into individual rowsaggregate - Groups and aggregates with count, sum, min, max, avglimit - Returns top N rowsoffset - Skips first N rowsorder-by - Not yet supported (rows returned in arbitrary order)join - Inner join between two input streamsleft-join - Left outer join (preserves unmatched left rows)exists - Boolean check if subquery returns results (with TTL caching)pull - Retrieves single related row as nested mappull* - Retrieves multiple related rows as arrayAll operators support incremental updates with full differential dataflow semantics.
See examples/ directory for:
basic.clj - Simple usage patternse2e_callbacks.clj - Comprehensive callback demonstrationsrepl_usage.clj - REPL experimentationMIT License - see LICENSE file
Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |