An implementation of the Raft consensus algorithm in Clojure. This library provides distributed consensus with pluggable components for networking, state machines, and persistence.
{:deps {com.fluree/raft {:mvn/version "1.0.0-beta1"}}}
Using Git dependency (for latest development version)
{:deps {fluree/raft {:git/url "https://github.com/fluree/raft"
                     :git/sha "904d915"}}} ; use latest commit SHA
This Raft implementation has been validated with Jepsen for correctness and performance.
Consistency Validation:
Three Test Environments:
Complete Test Guide - Setup, configuration, and test scenarios
Here's a minimal example of setting up a single-node Raft instance:
(ns my-app.core
  (:require [fluree.raft :as raft]))
;; Track application state
(def app-state (atom {}))
;; Define your state machine
;; Note: state machine receives entry and raft-state params
(defn state-machine [entry raft-state]
  ;; Process the entry and update application state
  (swap! app-state
         (fn [state]
           (case (:op entry)
             :set (assoc state (:key entry) (:value entry))
             :delete (dissoc state (:key entry))
             state)))
  ;; Return result for the command callback
  true)
;; Configure and start a Raft node
(def config
  {:servers          ["server1"]
   :this-server      "server1"  ; Required: identifies this node
   :send-rpc-fn      (fn [server msg callback] 
                       ;; For single node, invoke callback with nil
                       (when callback (callback nil)))
   :leader-change-fn (fn [event] 
                       (println "Leader changed to:" (:new-leader event)))
   :log-directory    "/var/raft/logs"
   :state-machine    state-machine
   
   ;; Required snapshot functions
   :snapshot-write   (fn [file state] 
                       (spit file (pr-str state)))
   :snapshot-reify   (fn [] @app-state)
   :snapshot-install (fn [snapshot index] 
                       (reset! app-state snapshot))
   :snapshot-xfer    (fn [snapshot server] 
                       ;; Transfer snapshot to another server
                       ;; For single node, this is a no-op
                       nil)
   :snapshot-list-indexes (fn [dir] 
                            ;; Return list of available snapshot indexes
                            [])})
;; Start the Raft instance
(def raft-instance (raft/start config))
;; Submit a command (only works on the leader)
;; new-entry requires a callback function
(raft/new-entry raft-instance 
                {:op :set :key "foo" :value "bar"}
                (fn [success?] 
                  (println "Entry submitted:" success?))
                5000) ; optional timeout in ms
For a real distributed system, you'll need to implement the networking layer:
;; Example configuration for node 1 of a 3-node cluster
(def config
  {:servers          ["server1" "server2" "server3"]
   :this-server      "server1"
   :send-rpc-fn      my-network-send-fn  ; Your RPC implementation
   :leader-change-fn handle-leader-change
   :log-directory    "/var/raft/server1/logs"
   :state-machine    state-machine
   :heartbeat-ms     150    ; Leader heartbeat interval
   :timeout-ms       300    ; Election timeout
   
   ;; Snapshot functions (see src/fluree/raft/kv_example.clj for full implementation)
   :snapshot-write   snapshot-writer-fn
   :snapshot-reify   snapshot-reify-fn
   :snapshot-install snapshot-install-fn
   :snapshot-xfer    snapshot-transfer-fn
   :snapshot-list-indexes snapshot-list-fn})
The Raft configuration map requires the following options:
{:servers          ["server1" "server2" "server3"]  ; List of all servers in cluster
 :this-server      "server1"                        ; This server's ID (must be in servers list)
 :send-rpc-fn      (fn [server msg callback] ...)   ; Network layer for sending messages
 :state-machine    (fn [entry raft-state] ...)      ; Your state machine function
 
 ;; Required snapshot functions
 :snapshot-write   (fn [file state] ...)            ; Write state to snapshot file
 :snapshot-reify   (fn [] ...)                      ; Create snapshot from current state
 :snapshot-install (fn [snapshot index] ...)        ; Install received snapshot
 :snapshot-xfer    (fn [snapshot server] ...)}      ; Transfer snapshot to server
{:log-directory         "raftlog/"           ; Directory for logs (default: "raftlog/")
 :leader-change-fn      (fn [event] ...)      ; Callback for leadership changes
 :close-fn              (fn [] ...)           ; Cleanup function on shutdown
 :snapshot-threshold    100                   ; Entries before creating snapshot (default: 100)
 :heartbeat-ms          100                   ; Leader heartbeat interval in ms (default: 100)
 :timeout-ms            500                   ; Election timeout in ms (default: 500)
 :log-history           10                    ; Number of old log files to keep (default: 10)
 :entries-max           50                    ; Max entries to send at once (default: 50)
 :default-command-timeout 4000               ; Default timeout for commands in ms (default: 4000)
 :catch-up-rounds       10                    ; Rounds to attempt catching up (default: 10)
 :entry-cache-size      nil                  ; Size of entry cache (optional)
 
 ;; Additional snapshot-related options
 :snapshot-list-indexes (fn [dir] ...)       ; List available snapshots
 
 ;; Advanced options (rarely needed)
 :event-chan            async/chan           ; Custom event channel
 :command-chan          async/chan}          ; Custom command channel
This implementation follows the Raft specification, providing:
raft.clj): Main event loop handling all state
transitionslog.clj): Persistent, rotating log storage with
corruption recoveryleader.clj): Replication, heartbeats, and
commitmentevents.clj): Common handlers for all server stateswatch.clj): Leadership change notificationsraft/startStarts a new Raft instance with the given configuration.
(raft/start config) ; => RaftInstance
raft/new-entrySubmits a new entry to the Raft log (leader only). Requires a callback function.
;; With default timeout (5000ms)
(raft/new-entry raft-instance entry callback)
;; With custom timeout
(raft/new-entry raft-instance entry callback timeout-ms)
;; Example:
(raft/new-entry raft-instance 
                {:op :set :key "foo" :value "bar"}
                (fn [success?] 
                  (if success?
                    (println "Entry committed")
                    (println "Entry failed")))
                3000)
raft/invoke-rpcGeneral RPC invocation mechanism. Used for operations like adding/removing servers.
;; Add a server to the cluster (leader only)
(raft/invoke-rpc raft-instance :add-server "server4" callback-fn)
;; Remove a server from the cluster (leader only)
(raft/invoke-rpc raft-instance :remove-server "server2" callback-fn)
raft/get-raft-stateGet the current Raft state asynchronously. Useful for debugging.
(raft/get-raft-state raft-instance 
                     (fn [state] 
                       (println "Current state:" (:status state))))
raft/add-leader-watchWatch for leadership changes.
(raft/add-leader-watch raft-instance 
                       :my-watch-key
                       (fn [leader-info]
                         (println "New leader:" leader-info)))
raft/closeCleanly shut down a Raft instance.
(raft/close raft-instance)
raft/monitor-raftDebugging tool to monitor all Raft events.
;; Register a monitoring function
(raft/monitor-raft raft-instance 
                   (fn [event] 
                     (println "Raft event:" event)))
;; Remove monitoring
(raft/monitor-raft raft-instance nil)
raft/remove-leader-watchRemove a previously registered leader watch.
(raft/remove-leader-watch raft-instance :my-watch-key)
Your state machine function receives the entry and the current raft state:
(defn state-machine 
  [entry raft-state]
  ;; Process entry and return result
  ;; The result will be passed to the command callback
  ;; Typically you'd maintain state in an atom and return success/failure
  result)
Implement these functions for snapshot support:
{:snapshot-write   (fn [file state] ...)         ; Write state to snapshot file
 :snapshot-reify   (fn [] current-state)         ; Create snapshot from current state
 :snapshot-install (fn [snapshot index] ...)     ; Install received snapshot
 :snapshot-xfer    (fn [snapshot server] ...)}   ; Transfer snapshot to server
See src/fluree/raft/kv_example.clj for a complete example implementing a
distributed key-value store. This example demonstrates:
(def config
  {:leader-change-fn (fn [event]
                       (println "Leader changed from" (:old-leader event)
                                "to" (:new-leader event)
                                "Event:" (:event event)))
   ;; ... other config
   })
# Download dependencies
make deps
# Run tests
make test
# Build JAR
make jar
# Run linting
make clj-kondo
# Generate coverage report
make coverage
# Check for outdated dependencies
make ancient
# Run all tests
make test
# Run with specific test selectors
clojure -M:test -i integration
The project includes a Dockerfile for containerized deployment:
# Build Docker image
docker build -t fluree/raft .
# Run with volume for logs
docker run -v /var/raft:/var/raft fluree/raft
Log Corruption
Leader Election Failures
Performance Tuning
{:snapshot-threshold 5000      ; Increase for write-heavy workloads
 :heartbeat-ms 50              ; Decrease for faster failure detection
 :timeout-ms 150               ; Decrease for quicker elections
 :entries-max 100}             ; Increase for better throughput
Enable debug logging by configuring your logging framework appropriately.
Copyright © 2018-2025 Fluree PBC
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.
Can you improve this documentation? These fine people already did:
Brian Platz, bplatz & Wes MorganEdit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs | 
| ← | Move to previous article | 
| → | Move to next article | 
| Ctrl+/ | Jump to the search field |