An implementation of the Raft consensus algorithm in Clojure. This library provides distributed consensus with pluggable components for networking, state machines, and persistence.
{:deps {com.fluree/raft {:mvn/version "1.0.0-beta1"}}}
Using Git dependency (for latest development version)
{:deps {fluree/raft {:git/url "https://github.com/fluree/raft"
:git/sha "904d915"}}} ; use latest commit SHA
This Raft implementation has been validated with Jepsen for correctness and performance.
Consistency Validation:
Three Test Environments:
Complete Test Guide - Setup, configuration, and test scenarios
Here's a minimal example of setting up a single-node Raft instance:
(ns my-app.core
(:require [fluree.raft :as raft]))
;; Track application state
(def app-state (atom {}))
;; Define your state machine
;; Note: state machine receives entry and raft-state params
(defn state-machine [entry raft-state]
;; Process the entry and update application state
(swap! app-state
(fn [state]
(case (:op entry)
:set (assoc state (:key entry) (:value entry))
:delete (dissoc state (:key entry))
state)))
;; Return result for the command callback
true)
;; Configure and start a Raft node
(def config
{:servers ["server1"]
:this-server "server1" ; Required: identifies this node
:send-rpc-fn (fn [server msg callback]
;; For single node, invoke callback with nil
(when callback (callback nil)))
:leader-change-fn (fn [event]
(println "Leader changed to:" (:new-leader event)))
:log-directory "/var/raft/logs"
:state-machine state-machine
;; Required snapshot functions
:snapshot-write (fn [file state]
(spit file (pr-str state)))
:snapshot-reify (fn [] @app-state)
:snapshot-install (fn [snapshot index]
(reset! app-state snapshot))
:snapshot-xfer (fn [snapshot server]
;; Transfer snapshot to another server
;; For single node, this is a no-op
nil)
:snapshot-list-indexes (fn [dir]
;; Return list of available snapshot indexes
[])})
;; Start the Raft instance
(def raft-instance (raft/start config))
;; Submit a command (only works on the leader)
;; new-entry requires a callback function
(raft/new-entry raft-instance
{:op :set :key "foo" :value "bar"}
(fn [success?]
(println "Entry submitted:" success?))
5000) ; optional timeout in ms
For a real distributed system, you'll need to implement the networking layer:
;; Example configuration for node 1 of a 3-node cluster
(def config
{:servers ["server1" "server2" "server3"]
:this-server "server1"
:send-rpc-fn my-network-send-fn ; Your RPC implementation
:leader-change-fn handle-leader-change
:log-directory "/var/raft/server1/logs"
:state-machine state-machine
:heartbeat-ms 150 ; Leader heartbeat interval
:timeout-ms 300 ; Election timeout
;; Snapshot functions (see src/fluree/raft/kv_example.clj for full implementation)
:snapshot-write snapshot-writer-fn
:snapshot-reify snapshot-reify-fn
:snapshot-install snapshot-install-fn
:snapshot-xfer snapshot-transfer-fn
:snapshot-list-indexes snapshot-list-fn})
The Raft configuration map requires the following options:
{:servers ["server1" "server2" "server3"] ; List of all servers in cluster
:this-server "server1" ; This server's ID (must be in servers list)
:send-rpc-fn (fn [server msg callback] ...) ; Network layer for sending messages
:state-machine (fn [entry raft-state] ...) ; Your state machine function
;; Required snapshot functions
:snapshot-write (fn [file state] ...) ; Write state to snapshot file
:snapshot-reify (fn [] ...) ; Create snapshot from current state
:snapshot-install (fn [snapshot index] ...) ; Install received snapshot
:snapshot-xfer (fn [snapshot server] ...)} ; Transfer snapshot to server
{:log-directory "raftlog/" ; Directory for logs (default: "raftlog/")
:leader-change-fn (fn [event] ...) ; Callback for leadership changes
:close-fn (fn [] ...) ; Cleanup function on shutdown
:snapshot-threshold 100 ; Entries before creating snapshot (default: 100)
:heartbeat-ms 100 ; Leader heartbeat interval in ms (default: 100)
:timeout-ms 500 ; Election timeout in ms (default: 500)
:log-history 10 ; Number of old log files to keep (default: 10)
:entries-max 50 ; Max entries to send at once (default: 50)
:default-command-timeout 4000 ; Default timeout for commands in ms (default: 4000)
:catch-up-rounds 10 ; Rounds to attempt catching up (default: 10)
:entry-cache-size nil ; Size of entry cache (optional)
;; Additional snapshot-related options
:snapshot-list-indexes (fn [dir] ...) ; List available snapshots
;; Advanced options (rarely needed)
:event-chan async/chan ; Custom event channel
:command-chan async/chan} ; Custom command channel
This implementation follows the Raft specification, providing:
raft.clj
): Main event loop handling all state
transitionslog.clj
): Persistent, rotating log storage with
corruption recoveryleader.clj
): Replication, heartbeats, and
commitmentevents.clj
): Common handlers for all server stateswatch.clj
): Leadership change notificationsraft/start
Starts a new Raft instance with the given configuration.
(raft/start config) ; => RaftInstance
raft/new-entry
Submits a new entry to the Raft log (leader only). Requires a callback function.
;; With default timeout (5000ms)
(raft/new-entry raft-instance entry callback)
;; With custom timeout
(raft/new-entry raft-instance entry callback timeout-ms)
;; Example:
(raft/new-entry raft-instance
{:op :set :key "foo" :value "bar"}
(fn [success?]
(if success?
(println "Entry committed")
(println "Entry failed")))
3000)
raft/invoke-rpc
General RPC invocation mechanism. Used for operations like adding/removing servers.
;; Add a server to the cluster (leader only)
(raft/invoke-rpc raft-instance :add-server "server4" callback-fn)
;; Remove a server from the cluster (leader only)
(raft/invoke-rpc raft-instance :remove-server "server2" callback-fn)
raft/get-raft-state
Get the current Raft state asynchronously. Useful for debugging.
(raft/get-raft-state raft-instance
(fn [state]
(println "Current state:" (:status state))))
raft/add-leader-watch
Watch for leadership changes.
(raft/add-leader-watch raft-instance
:my-watch-key
(fn [leader-info]
(println "New leader:" leader-info)))
raft/close
Cleanly shut down a Raft instance.
(raft/close raft-instance)
raft/monitor-raft
Debugging tool to monitor all Raft events.
;; Register a monitoring function
(raft/monitor-raft raft-instance
(fn [event]
(println "Raft event:" event)))
;; Remove monitoring
(raft/monitor-raft raft-instance nil)
raft/remove-leader-watch
Remove a previously registered leader watch.
(raft/remove-leader-watch raft-instance :my-watch-key)
Your state machine function receives the entry and the current raft state:
(defn state-machine
[entry raft-state]
;; Process entry and return result
;; The result will be passed to the command callback
;; Typically you'd maintain state in an atom and return success/failure
result)
Implement these functions for snapshot support:
{:snapshot-write (fn [file state] ...) ; Write state to snapshot file
:snapshot-reify (fn [] current-state) ; Create snapshot from current state
:snapshot-install (fn [snapshot index] ...) ; Install received snapshot
:snapshot-xfer (fn [snapshot server] ...)} ; Transfer snapshot to server
See src/fluree/raft/kv_example.clj
for a complete example implementing a
distributed key-value store. This example demonstrates:
(def config
{:leader-change-fn (fn [event]
(println "Leader changed from" (:old-leader event)
"to" (:new-leader event)
"Event:" (:event event)))
;; ... other config
})
# Download dependencies
make deps
# Run tests
make test
# Build JAR
make jar
# Run linting
make clj-kondo
# Generate coverage report
make coverage
# Check for outdated dependencies
make ancient
# Run all tests
make test
# Run with specific test selectors
clojure -M:test -i integration
The project includes a Dockerfile for containerized deployment:
# Build Docker image
docker build -t fluree/raft .
# Run with volume for logs
docker run -v /var/raft:/var/raft fluree/raft
Log Corruption
Leader Election Failures
Performance Tuning
{:snapshot-threshold 5000 ; Increase for write-heavy workloads
:heartbeat-ms 50 ; Decrease for faster failure detection
:timeout-ms 150 ; Decrease for quicker elections
:entries-max 100} ; Increase for better throughput
Enable debug logging by configuring your logging framework appropriately.
Copyright © 2018-2025 Fluree PBC
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.
Can you improve this documentation? These fine people already did:
Brian Platz, bplatz & Wes MorganEdit on GitHub
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close