LiteLLM provides built-in support for streaming responses from LLM providers using core.async channels.
Streaming allows you to receive response chunks as they're generated, rather than waiting for the complete response. This provides:
(require '[litellm.core :as core]
         '[litellm.streaming :as streaming]
         '[clojure.core.async :refer [go-loop <!]])
;; Request streaming response
(def ch (core/completion :openai "gpt-4"
          {:messages [{:role :user :content "Write a story"}]
           :stream true}
          {:api-key "sk-..."}))
;; Consume with go-loop
(go-loop []
  (when-let [chunk (<! ch)]
    (when-let [content (streaming/extract-content chunk)]
      (print content)
      (flush))
    (recur)))
(require '[litellm.router :as router])
(router/quick-setup!)
(def ch (router/completion :openai
          {:messages [{:role :user :content "Explain quantum physics"}]
           :stream true}))
(go-loop []
  (when-let [chunk (<! ch)]
    (print (streaming/extract-content chunk))
    (recur)))
Extract text content from a stream chunk.
(streaming/extract-content chunk)
;; => "partial text"
Higher-level API with callbacks for different events.
(streaming/consume-stream-with-callbacks ch
  (fn [chunk]
    ;; Called for each chunk
    (print (streaming/extract-content chunk)))
  (fn [complete-response]
    ;; Called when stream completes
    (println "\nStream complete!"))
  (fn [error]
    ;; Called on error
    (println "Error:" error)))
Collect all chunks into a single response.
(let [complete-response (<! (streaming/collect-stream ch))]
  (println (core/extract-content complete-response)))
| Provider | Streaming Support | 
|---|---|
| OpenAI | ✅ Yes | 
| Anthropic | ✅ Yes | 
| Gemini | ✅ Yes | 
| Mistral | ✅ Yes | 
| OpenRouter | ✅ Yes | 
| Ollama | ✅ Yes | 
(ns my-app.cli
  (:require [litellm.core :as core]
            [litellm.streaming :as streaming]
            [clojure.core.async :refer [go-loop <!]]))
(defn ask-streaming [question]
  (let [ch (core/completion :openai "gpt-4"
             {:messages [{:role :user :content question}]
              :stream true}
             {:api-key (System/getenv "OPENAI_API_KEY")})]
    
    (println "Assistant:")
    (streaming/consume-stream-with-callbacks ch
      (fn [chunk]
        (print (streaming/extract-content chunk))
        (flush))
      (fn [_] (println "\n"))
      (fn [error] (println "Error:" error)))))
(ask-streaming "Explain machine learning")
(ns my-app.web
  (:require [litellm.router :as router]
            [litellm.streaming :as streaming]
            [clojure.core.async :refer [go-loop <!]]))
(router/quick-setup!)
(defn stream-completion-handler [request]
  (let [question (get-in request [:params :question])
        ch (router/completion :openai
             {:messages [{:role :user :content question}]
              :stream true})]
    
    {:status 200
     :headers {"Content-Type" "text/event-stream"
               "Cache-Control" "no-cache"}
     :body (streaming/->sse-stream ch)}))
(ns my-app.ui
  (:require [litellm.core :as core]
            [litellm.streaming :as streaming]
            [clojure.core.async :refer [go-loop <!]]
            [reagent.core :as r]))
(defn streaming-chat []
  (let [response (r/atom "")]
    (fn []
      [:div
       [:button
        {:on-click
         (fn []
           (let [ch (core/completion :openai "gpt-4"
                      {:messages [{:role :user :content "Tell me a joke"}]
                       :stream true}
                      {:api-key "sk-..."})]
             (reset! response "")
             (go-loop []
               (when-let [chunk (<! ch)]
                 (when-let [content (streaming/extract-content chunk)]
                   (swap! response str content))
                 (recur)))))}
        "Get Joke"]
       [:div @response]])))
(require '[clojure.core.async :refer [<!]])
(defn get-complete-streaming-response [question]
  (let [ch (core/completion :openai "gpt-4"
             {:messages [{:role :user :content question}]
              :stream true}
             {:api-key "sk-..."})]
    ;; Collect all chunks into complete response
    (<! (streaming/collect-stream ch))))
;; Use in async context
(go
  (let [response (<! (get-complete-streaming-response "What is AI?"))]
    (println (core/extract-content response))))
(defn stream-with-metrics [question]
  (let [ch (core/completion :openai "gpt-4"
             {:messages [{:role :user :content question}]
              :stream true}
             {:api-key "sk-..."})
        token-count (atom 0)
        start-time (System/currentTimeMillis)]
    
    (streaming/consume-stream-with-callbacks ch
      (fn [chunk]
        (when-let [content (streaming/extract-content chunk)]
          (swap! token-count + (core/estimate-tokens content))
          (print content)
          (flush)))
      (fn [_]
        (let [duration (- (System/currentTimeMillis) start-time)]
          (println (format "\n\nTokens: %d | Time: %dms | Tokens/sec: %.1f"
                          @token-count
                          duration
                          (/ (* @token-count 1000.0) duration)))))
      (fn [error]
        (println "Error:" error)))))
(require '[clojure.core.async :refer [close! timeout go <!]])
(defn stream-with-timeout [question max-time-ms]
  (let [ch (core/completion :openai "gpt-4"
             {:messages [{:role :user :content question}]
              :stream true}
             {:api-key "sk-..."})]
    
    (go
      (let [timeout-ch (timeout max-time-ms)]
        (loop []
          (let [[chunk port] (alts! [ch timeout-ch])]
            (cond
              (= port timeout-ch)
              (do
                (close! ch)
                (println "\nTimeout!"))
              
              chunk
              (do
                (print (streaming/extract-content chunk))
                (flush)
                (recur))
              
              :else
              (println "\nComplete!"))))))))
(require '[litellm.router :as router])
(router/register! :fast {:provider :openai :model "gpt-4o-mini" :config {...}})
(router/register! :smart {:provider :anthropic :model "claude-3-opus-20240229" :config {...}})
(defn stream-from-multiple [question]
  (doseq [config [:fast :smart]]
    (println (str "\n=== " (name config) " ===\""))
    (let [ch (router/completion config
               {:messages [{:role :user :content question}]
                :stream true})]
      (<!! (streaming/consume-stream-with-callbacks ch
             (fn [chunk] (print (streaming/extract-content chunk)))
             (fn [_] (println))
             (fn [e] (println "Error:" e)))))))
(require '[litellm.errors :as errors])
(defn safe-stream [question]
  (try
    (let [ch (core/completion :openai "gpt-4"
               {:messages [{:role :user :content question}]
                :stream true}
               {:api-key "sk-..."})]
      
      (streaming/consume-stream-with-callbacks ch
        (fn [chunk] (print (streaming/extract-content chunk)))
        (fn [_] (println "\nSuccess!"))
        (fn [error]
          (if (errors/litellm-error? error)
            (println "LiteLLM Error:" (errors/error-summary error))
            (println "Unknown error:" error)))))
    
    (catch Exception e
      (println "Setup error:" (.getMessage e)))))
;; ❌ Don't do this - no completion handling
(go-loop []
  (when-let [chunk (<! ch)]
    (print (streaming/extract-content chunk))
    (recur)))
;; ✅ Do this - handle completion
(streaming/consume-stream-with-callbacks ch
  on-chunk
  on-complete
  on-error)
;; ✅ Flush after each print
(print (streaming/extract-content chunk))
(flush)
;; ✅ Always provide error callback
(streaming/consume-stream-with-callbacks ch
  (fn [chunk] ...)
  (fn [response] ...)
  (fn [error]
    (log/error "Stream error" error)
    (notify-user "Something went wrong")))
;; ✅ Close channels when done
(when cancelled?
  (close! ch))
Check if the channel is being consumed:
;; Make sure you're reading from the channel
(go-loop []
  (when-let [chunk (<! ch)]
    ;; Process chunk
    (recur)))
Some chunks may have no content (metadata chunks):
;; ✅ Check for nil
(when-let [content (streaming/extract-content chunk)]
  (print content))
Close unused channels:
;; Clean up when canceling
(close! ch)
Can you improve this documentation?Edit on GitHub
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs | 
| ← | Move to previous article | 
| → | Move to next article | 
| Ctrl+/ | Jump to the search field |