Core streaming utilities for LiteLLM
Core streaming utilities for LiteLLM
(accumulate-stream source-ch)Accumulate streaming chunks into complete strings.
Takes a source channel of chunks and returns a new channel that emits accumulated content strings as they grow.
Example: (let [source-ch (completion :model "..." :stream true) acc-ch (accumulate-stream source-ch)] (go-loop [] (when-let [accumulated (<! acc-ch)] (println "So far:" accumulated) (recur))))
Accumulate streaming chunks into complete strings.
Takes a source channel of chunks and returns a new channel that emits
accumulated content strings as they grow.
Example:
  (let [source-ch (completion :model "..." :stream true)
        acc-ch (accumulate-stream source-ch)]
    (go-loop []
      (when-let [accumulated (<! acc-ch)]
        (println "So far:" accumulated)
        (recur))))(close-stream! ch)Safely close a stream channel.
Safely close a stream channel.
(collect-stream source-ch)Blocking function to collect all chunks from a stream into a final string.
Returns a map with:
Example: (let [ch (completion :model "..." :stream true) result (collect-stream ch)] (if (:error result) (println "Error:" (:error result)) (println "Content:" (:content result))))
Blocking function to collect all chunks from a stream into a final string.
Returns a map with:
- :content - The complete accumulated content
- :chunks - Vector of all chunks received
- :error - Error map if an error occurred
Example:
  (let [ch (completion :model "..." :stream true)
        result (collect-stream ch)]
    (if (:error result)
      (println "Error:" (:error result))
      (println "Content:" (:content result))))(consume-stream-with-callbacks ch on-chunk on-complete on-error)Consume a stream channel with callback functions.
This provides a callback-based interface over core.async channels.
Parameters:
Returns nil immediately. All interaction happens through callbacks.
Example: (let [ch (completion :model "..." :stream true)] (consume-stream-with-callbacks ch (fn [chunk] (print (extract-content chunk))) (fn [response] (println "\nDone!" response)) (fn [error] (println "Error:" error))))
Consume a stream channel with callback functions.
This provides a callback-based interface over core.async channels.
Parameters:
- ch: Source channel of chunks
- on-chunk: Called for each chunk (fn [chunk])
- on-complete: Called when stream ends with final accumulated response (fn [response])
- on-error: Called if an error occurs (fn [error-chunk])
Returns nil immediately. All interaction happens through callbacks.
Example:
  (let [ch (completion :model "..." :stream true)]
    (consume-stream-with-callbacks ch
      (fn [chunk] (print (extract-content chunk)))
      (fn [response] (println "\nDone!" response))
      (fn [error] (println "Error:" error))))(create-stream-channel & {:keys [buffer-size] :or {buffer-size 64}})Create a buffered channel for streaming responses.
Options:
Create a buffered channel for streaming responses. Options: - :buffer-size - Size of the channel buffer (default: 64)
(extract-content chunk)Extract content from a streaming chunk.
Returns the incremental content string, or nil if no content present.
Extract content from a streaming chunk. Returns the incremental content string, or nil if no content present.
(extract-finish-reason chunk)Extract finish reason from a streaming chunk.
Extract finish reason from a streaming chunk.
(is-error-chunk? chunk)Check if a chunk is an error chunk.
Check if a chunk is an error chunk.
(parse-sse-line line json-decoder)Parse a Server-Sent Events line.
SSE format: data: {json} data: [DONE]
Returns parsed JSON map or nil.
Parse a Server-Sent Events line.
SSE format:
data: {json}
data: [DONE]
Returns parsed JSON map or nil.(stream-error provider
              message
              &
              {:keys [status code data error-type recoverable?]})Create an error chunk for streaming.
Error chunks are special messages placed on the channel to signal errors.
This is a convenience wrapper around litellm.errors/streaming-error-chunk.
Create an error chunk for streaming. Error chunks are special messages placed on the channel to signal errors. This is a convenience wrapper around litellm.errors/streaming-error-chunk.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs | 
| ← | Move to previous article | 
| → | Move to next article | 
| Ctrl+/ | Jump to the search field |