Liking cljdoc? Tell your friends :D

Cog Town 🏘️

Clojars Project

Build agentic workflows in Clojure with the ergonomics of core.async.

cog.town gives you a tiny set of composable primitives—cogs—stateful, concurrent agents that pass messages over channels.
Think of them as Lego® bricks for conversational or multimodal AI systems.

(ns my.ns
  (require [clojure.core.async :as a]
           [clojure.string :as string]
           [cog.town :as cogs]))

;;; 1. Create some cogs
(def echo
  (cogs/cog [] (fn [ctx msg]
                 (let [resp (str "👋 you said: "  msg)]
                   (-> (conj ctx msg)
                       (conj resp)
                       (vector resp))))))

(def shout
    (cogs/cog [] (fn [ctx msg]
                   (let [resp (clojure.string/upper-case msg)]
                     (-> (conj ctx msg)
                         (conj resp)
                         (vector resp))))))

;;; cogs can be dereferenced to get a live snapshot of their context
@shout
;; => ["hello!", "HELLO!"]

;;; 2. Wire cogs into a flow

(def shout-flow (cogs/flow [echo shout]))
(a/put! shout-flow "hello!")
(a/take! shout-flow println)
(a/close! shout-flow)

;;; 3. Let two cogs talk

(def shout-convo (cogs/dialogue echo shout))
(a/put! shout-convo "hello!")
(a/go-loop []
  (when-some [msg (a/<! shout-convo)]
    (println msg)
    (recur)))

You write pure business logic; Cog Town handles state threading, back‑pressure, and blocking work on dedicated threads.


Core concepts

PrimitiveWhat it isWhen to reach for it
cogBidirectional channel plus private immutable context and a pure transition. Implements ReadPort, WritePort, Channel, Mult, IDeref.Anytime you need an agent that remembers and evolves.
extendLight‑weight wrapper around a cog that adds or re‑routes its I/O without touching core logic.Enrich output (TTS, embeddings) or translate input.
forkClones a cog, optionally transforming its context, IO pair, or transition.Re‑use behaviours with tweaks; create read‑only taps; testing.
flowSequentially connects N channels so each output becomes the next input.Pipelines (ETL, request → AI → TTS, etc.).
fanoutBroadcasts an input value to many channels, gathers ordered results.Scatter‑gather, multi‑tool calls.
gateReleases the value of a latched channel when triggered by input.Throttling, deferred fetches.
dialoguePing‑pong messages between two cogs forever.Multi‑turn agent duets or self‑conversation.

Transition functions

A cog’s transition function must be pure:

transition :: (context, input) → [new‑context, output]
  • Runs on its own real thread (via async/thread) so it is safe to block on HTTP or disk I/O.
  • Throwing emits {:type ::error :throwable th :input input} as output.

Observing & time‑travel

  • Live snapshot – since a cog implements IDeref, you can simply do (@my-cog) to get the latest context value.
  • Audit / replay – Optionally collect the pair [ctx out] in transition functions, storing it in an atom or log. Replaying the log with the pure transition will reproduce the run deterministically (with temperature = 0).
  • Forking – Pick any historical ctx value and feed it back into a new cog for “what‑if” exploration.

Performance & GC notes

  • Contexts ≤ 2 MB updated a few times per second are generally safe.
  • Watch -Xlog:gc* or jstat -gcutil during dev; full GCs should be rare.
  • If context grows (large transcripts, embeddings), store bulky data off‑heap or behind an ID reference and keep lightweight keys in ctx.
  • Cap the length of any time‑travel timeline vector or store deltas to avoid memory blow‑up.

API reference (cheatsheet)

(cog     ctx transition & [buf-or-n xf ex-handler])            => Cog
(fork    cog)                                                  => Cog
(fork    cog ctx-fn)                                           => Cog
(fork    cog ctx-fn io)                                        => Cog
(fork    cog ctx-fn io transition-fn)                          => Cog
(extend  cog io & [transition-fn])                             => Cog

(flow    [ch1 ch2 …] & opts)                                   => IoChannel
(fanout  chs & opts)                                           => IoChannel
(gate    trigger-ch & opts)                                    => IoChannel
(dialogue cogA cogB & opts)                                    => IoChannel

(context cog)   ;; => same as @cog
(:*context cog) ;; => access the context without dereferencing

For detailed doc‑strings run:

(clojure.repl/doc cog.town/cog)

License

MIT © 2025 Brian Scaturro

Can you improve this documentation?Edit on GitHub

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close