Liking cljdoc? Tell your friends :D

s-exp.k7


ack!clj

(ack! cg msg)

Acknowledge a delivered message. Updates cursor asynchronously via flush thread. Not thread-safe: must be called from the same thread as poll!.

Acknowledge a delivered message.
Updates cursor asynchronously via flush thread.
Not thread-safe: must be called from the same thread as poll!.
raw docstring

aligned-frame-sizeclj

(aligned-frame-size payload-len)

close-consumer-group!clj

(close-consumer-group! cg)

close-queue!clj

(close-queue! q)

close-segment!clj

(close-segment! seg)

consumer-groupclj

(consumer-group q group-id)
(consumer-group q
                group-id
                {:keys [cursor-fsync-strategy]
                 :or {cursor-fsync-strategy :async}})

Open or create a consumer group on queue q identified by group-id.

Options: :cursor-fsync-strategy — :async (default), :flush, or :sync :async background thread fsyncs cursor on ack :flush cursor written to mmap but not fsynced (no crash guarantee) :sync cursor fsynced on every ack (slowest)

Open or create a consumer group on queue q identified by group-id.

Options:
  :cursor-fsync-strategy — :async (default), :flush, or :sync
                           :async  background thread fsyncs cursor on ack
                           :flush  cursor written to mmap but not fsynced (no crash guarantee)
                           :sync   cursor fsynced on every ack (slowest)
raw docstring

current-segmentclj

(current-segment q)

cursor-file-sizeclj


default-segment-sizeclj


enqueue!clj

(enqueue! q data)

Write data (byte array) to the queue. Returns the global offset of the written message. Not thread-safe: single writer only — must not be called concurrently. Durability depends on the queue's fsync-strategy: :async — flushed by background thread; returns immediately :flush — written to mmap; no fsync; returns immediately :sync — fsynced before returning

Write data (byte array) to the queue.
Returns the global offset of the written message.
Not thread-safe: single writer only — must not be called concurrently.
Durability depends on the queue's fsync-strategy:
  :async  — flushed by background thread; returns immediately
  :flush  — written to mmap; no fsync; returns immediately
  :sync   — fsynced before returning
raw docstring

flag-committedclj


force-segment!clj

(force-segment! seg)

frame-alignclj


frame-align-maskclj


frame-header-sizeclj


frame-total-sizeclj

(frame-total-size buf pos)

lrb-add!clj

(lrb-add! lrb v)

lrb-clear!clj

(lrb-clear! lrb)

lrb-empty?clj

(lrb-empty? lrb)

lrb-firstclj

(lrb-first lrb)

lrb-newclj

(lrb-new)
(lrb-new init-cap)

Create a new empty LongRingBuffer with the given initial capacity (rounded up to power of 2).

Create a new empty LongRingBuffer with the given initial capacity (rounded up to power of 2).
raw docstring

lrb-remove!clj

(lrb-remove! lrb v)

magicclj


msg->offsetclj

(msg->offset m)

msg->payloadclj

(msg->payload m)

nack!clj

(nack! cg msg)

Negative-ack: rewinds read-head so the message is redelivered. Not thread-safe: must be called from the same thread as poll!.

Negative-ack: rewinds read-head so the message is redelivered.
Not thread-safe: must be called from the same thread as poll!.
raw docstring

open-segmentclj

(open-segment path base-offset segment-size)

payload->bytesclj

(payload->bytes bb)

Copy a payload ByteBuffer to a fresh byte array.

Copy a payload ByteBuffer to a fresh byte array.
raw docstring

poll!clj

(poll! cg)
(poll! cg
       {:keys [max-batch timeout-ms park-ns]
        :or {max-batch 256 timeout-ms 1 park-ns 10000}})

Adaptive batch poll. Returns a vector of Msg records. Reads up to max-batch messages or blocks up to timeout-ms before returning whatever has accumulated (possibly empty). Access msg fields via (.offset msg) and (.payload msg).

Not thread-safe: a ConsumerGroup must be used from a single thread. Multiple independent consumer groups may read the same Queue concurrently.

Options: :max-batch — maximum messages to return (default 256) :timeout-ms — max wait in milliseconds (default 1) :park-ns — nanoseconds to park between empty polls (default 10000 = 10µs). Lower values reduce latency at the cost of CPU; 0 busy-spins.

Adaptive batch poll. Returns a vector of Msg records.
Reads up to max-batch messages or blocks up to timeout-ms before returning
whatever has accumulated (possibly empty).
Access msg fields via (.offset msg) and (.payload msg).

Not thread-safe: a ConsumerGroup must be used from a single thread.
Multiple independent consumer groups may read the same Queue concurrently.

Options:
  :max-batch   — maximum messages to return (default 256)
  :timeout-ms  — max wait in milliseconds (default 1)
  :park-ns     — nanoseconds to park between empty polls (default 10000 = 10µs).
                 Lower values reduce latency at the cost of CPU; 0 busy-spins.
raw docstring

queueclj

(queue dir)
(queue dir
       {:keys [segment-size fsync-strategy commit-interval-us]
        :or {segment-size default-segment-size
             fsync-strategy :async
             commit-interval-us 50}})

Open (or recover) a queue stored at dir.

Options: :segment-size — bytes per segment file (default 256MB) :fsync-strategy — :async (default), :flush, or :sync :async background thread fsyncs; low latency, small durability window :flush no explicit fsync; rely on OS writeback; no crash guarantee :sync fsync on every enqueue!; max durability, lowest throughput :commit-interval-us — fsync interval in microseconds for :async (default 50)

Open (or recover) a queue stored at dir.

Options:
  :segment-size       — bytes per segment file (default 256MB)
  :fsync-strategy     — :async (default), :flush, or :sync
                        :async  background thread fsyncs; low latency, small durability window
                        :flush  no explicit fsync; rely on OS writeback; no crash guarantee
                        :sync   fsync on every enqueue!; max durability, lowest throughput
  :commit-interval-us — fsync interval in microseconds for :async (default 50)
raw docstring

recover-segment!clj

(recover-segment! seg)

Scan frames from position 0, set write-pos/committed-pos to end of last valid contiguous frame.

Scan frames from position 0, set write-pos/committed-pos to end
of last valid contiguous frame.
raw docstring

seek!clj

(seek! cg offset)

Reset the consumer read position to global-offset. Clears all pending unacked messages. The next poll! will deliver messages from offset onwards. Not thread-safe: must be called from the same thread as poll!. offset may be:

  • any valid global offset (e.g. from a previous Msg)
  • 0 to replay from the beginning of the queue
Reset the consumer read position to global-offset.
Clears all pending unacked messages. The next poll! will deliver
messages from offset onwards.
Not thread-safe: must be called from the same thread as poll!.
offset may be:
  - any valid global offset (e.g. from a previous Msg)
  - 0 to replay from the beginning of the queue
raw docstring

segment-full?clj

(segment-full? seg frame-size)

valid-frame?clj

(valid-frame? buf pos capacity)

Returns true if there is a valid frame at pos in buf.

Returns true if there is a valid frame at pos in buf.
raw docstring

write-frame!clj

(write-frame! buf pos data frame-size)

Write a framed message into buf at position pos. frame-size must be (aligned-frame-size (alength data)) — passed in to avoid recomputing it since enqueue! already has this value.

Write a framed message into buf at position pos.
frame-size must be (aligned-frame-size (alength data)) — passed in to avoid
recomputing it since enqueue! already has this value.
raw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close