(ack! cg msg)Acknowledge a delivered message. Updates cursor asynchronously via flush thread. Not thread-safe: must be called from the same thread as poll!.
Acknowledge a delivered message. Updates cursor asynchronously via flush thread. Not thread-safe: must be called from the same thread as poll!.
(aligned-frame-size payload-len)(close-consumer-group! cg)(close-queue! q)(close-segment! seg)(consumer-group q group-id)(consumer-group q
group-id
{:keys [cursor-fsync-strategy]
:or {cursor-fsync-strategy :async}})Open or create a consumer group on queue q identified by group-id.
Options: :cursor-fsync-strategy — :async (default), :flush, or :sync :async background thread fsyncs cursor on ack :flush cursor written to mmap but not fsynced (no crash guarantee) :sync cursor fsynced on every ack (slowest)
Open or create a consumer group on queue q identified by group-id.
Options:
:cursor-fsync-strategy — :async (default), :flush, or :sync
:async background thread fsyncs cursor on ack
:flush cursor written to mmap but not fsynced (no crash guarantee)
:sync cursor fsynced on every ack (slowest)(current-segment q)(enqueue! q data)Write data (byte array) to the queue. Returns the global offset of the written message. Not thread-safe: single writer only — must not be called concurrently. Durability depends on the queue's fsync-strategy: :async — flushed by background thread; returns immediately :flush — written to mmap; no fsync; returns immediately :sync — fsynced before returning
Write data (byte array) to the queue. Returns the global offset of the written message. Not thread-safe: single writer only — must not be called concurrently. Durability depends on the queue's fsync-strategy: :async — flushed by background thread; returns immediately :flush — written to mmap; no fsync; returns immediately :sync — fsynced before returning
(force-segment! seg)(frame-total-size buf pos)(lrb-add! lrb v)(lrb-clear! lrb)(lrb-empty? lrb)(lrb-first lrb)(lrb-new)(lrb-new init-cap)Create a new empty LongRingBuffer with the given initial capacity (rounded up to power of 2).
Create a new empty LongRingBuffer with the given initial capacity (rounded up to power of 2).
(lrb-remove! lrb v)(msg->offset m)(msg->payload m)(nack! cg msg)Negative-ack: rewinds read-head so the message is redelivered. Not thread-safe: must be called from the same thread as poll!.
Negative-ack: rewinds read-head so the message is redelivered. Not thread-safe: must be called from the same thread as poll!.
(open-segment path base-offset segment-size)(payload->bytes bb)Copy a payload ByteBuffer to a fresh byte array.
Copy a payload ByteBuffer to a fresh byte array.
(poll! cg)(poll! cg
{:keys [max-batch timeout-ms park-ns]
:or {max-batch 256 timeout-ms 1 park-ns 10000}})Adaptive batch poll. Returns a vector of Msg records. Reads up to max-batch messages or blocks up to timeout-ms before returning whatever has accumulated (possibly empty). Access msg fields via (.offset msg) and (.payload msg).
Not thread-safe: a ConsumerGroup must be used from a single thread. Multiple independent consumer groups may read the same Queue concurrently.
Options: :max-batch — maximum messages to return (default 256) :timeout-ms — max wait in milliseconds (default 1) :park-ns — nanoseconds to park between empty polls (default 10000 = 10µs). Lower values reduce latency at the cost of CPU; 0 busy-spins.
Adaptive batch poll. Returns a vector of Msg records.
Reads up to max-batch messages or blocks up to timeout-ms before returning
whatever has accumulated (possibly empty).
Access msg fields via (.offset msg) and (.payload msg).
Not thread-safe: a ConsumerGroup must be used from a single thread.
Multiple independent consumer groups may read the same Queue concurrently.
Options:
:max-batch — maximum messages to return (default 256)
:timeout-ms — max wait in milliseconds (default 1)
:park-ns — nanoseconds to park between empty polls (default 10000 = 10µs).
Lower values reduce latency at the cost of CPU; 0 busy-spins.(queue dir)(queue dir
{:keys [segment-size fsync-strategy commit-interval-us]
:or {segment-size default-segment-size
fsync-strategy :async
commit-interval-us 50}})Open (or recover) a queue stored at dir.
Options: :segment-size — bytes per segment file (default 256MB) :fsync-strategy — :async (default), :flush, or :sync :async background thread fsyncs; low latency, small durability window :flush no explicit fsync; rely on OS writeback; no crash guarantee :sync fsync on every enqueue!; max durability, lowest throughput :commit-interval-us — fsync interval in microseconds for :async (default 50)
Open (or recover) a queue stored at dir.
Options:
:segment-size — bytes per segment file (default 256MB)
:fsync-strategy — :async (default), :flush, or :sync
:async background thread fsyncs; low latency, small durability window
:flush no explicit fsync; rely on OS writeback; no crash guarantee
:sync fsync on every enqueue!; max durability, lowest throughput
:commit-interval-us — fsync interval in microseconds for :async (default 50)(recover-segment! seg)Scan frames from position 0, set write-pos/committed-pos to end of last valid contiguous frame.
Scan frames from position 0, set write-pos/committed-pos to end of last valid contiguous frame.
(seek! cg offset)Reset the consumer read position to global-offset. Clears all pending unacked messages. The next poll! will deliver messages from offset onwards. Not thread-safe: must be called from the same thread as poll!. offset may be:
Reset the consumer read position to global-offset. Clears all pending unacked messages. The next poll! will deliver messages from offset onwards. Not thread-safe: must be called from the same thread as poll!. offset may be: - any valid global offset (e.g. from a previous Msg) - 0 to replay from the beginning of the queue
(segment-full? seg frame-size)(valid-frame? buf pos capacity)Returns true if there is a valid frame at pos in buf.
Returns true if there is a valid frame at pos in buf.
(write-frame! buf pos data frame-size)Write a framed message into buf at position pos. frame-size must be (aligned-frame-size (alength data)) — passed in to avoid recomputing it since enqueue! already has this value.
Write a framed message into buf at position pos. frame-size must be (aligned-frame-size (alength data)) — passed in to avoid recomputing it since enqueue! already has this value.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |