Liking cljdoc? Tell your friends :D

clj-ebpf.events

Enhanced BPF event reading from ring buffers with memory mapping and epoll support

Enhanced BPF event reading from ring buffers with memory mapping and epoll support
raw docstring

backpressure-healthy?clj

(backpressure-healthy? consumer
                       &
                       {:keys [max-drop-rate max-queue-utilization]
                        :or {max-drop-rate 5.0 max-queue-utilization 80.0}})

Check if the backpressure consumer is healthy.

Returns true if:

  • Drop rate is below threshold (default 5%)
  • Queue utilization is below threshold (default 80%)

Options:

  • :max-drop-rate - Maximum acceptable drop rate percentage (default: 5.0)
  • :max-queue-utilization - Maximum acceptable queue fill percentage (default: 80.0)
Check if the backpressure consumer is healthy.

Returns true if:
- Drop rate is below threshold (default 5%)
- Queue utilization is below threshold (default 80%)

Options:
- :max-drop-rate - Maximum acceptable drop rate percentage (default: 5.0)
- :max-queue-utilization - Maximum acceptable queue fill percentage (default: 80.0)
sourceraw docstring

create-backpressure-consumerclj

(create-backpressure-consumer
  {:keys [map callback deserializer max-pending drop-handler batch-size]
   :or
     {deserializer identity max-pending 10000 drop-handler nil batch-size 64}})

Create a ring buffer consumer with backpressure support.

When the processing queue fills up, new events are dropped rather than causing unbounded memory growth. Dropped events can be tracked via statistics or handled via an optional drop handler.

Options:

  • :map - The ring buffer map (required)
  • :callback - Function called for each event (receives deserialized event)
  • :deserializer - Optional function to deserialize event bytes (default: identity)
  • :max-pending - Maximum events in processing queue before dropping (default: 10000)
  • :drop-handler - Optional function called when events are dropped (receives event bytes)
  • :batch-size - Number of events to read in each batch (default: 64)

Returns a BackpressureConsumer record.

Example: (def consumer (create-backpressure-consumer {:map ringbuf-map :callback process-event :deserializer parse-event :max-pending 5000 :drop-handler (fn [_] (log/warn "Event dropped"))}))

Create a ring buffer consumer with backpressure support.

When the processing queue fills up, new events are dropped rather than
causing unbounded memory growth. Dropped events can be tracked via
statistics or handled via an optional drop handler.

Options:
- :map - The ring buffer map (required)
- :callback - Function called for each event (receives deserialized event)
- :deserializer - Optional function to deserialize event bytes (default: identity)
- :max-pending - Maximum events in processing queue before dropping (default: 10000)
- :drop-handler - Optional function called when events are dropped (receives event bytes)
- :batch-size - Number of events to read in each batch (default: 64)

Returns a BackpressureConsumer record.

Example:
  (def consumer
    (create-backpressure-consumer
      {:map ringbuf-map
       :callback process-event
       :deserializer parse-event
       :max-pending 5000
       :drop-handler (fn [_] (log/warn "Event dropped"))}))
sourceraw docstring

create-ringbuf-consumerclj

(create-ringbuf-consumer {:keys [map callback deserializer batch-size]
                          :or {deserializer identity batch-size 64}})

Create an enhanced ring buffer consumer with memory mapping and epoll

Options:

  • :map - The ring buffer map (required)
  • :callback - Function called for each event (receives event data)
  • :deserializer - Optional function to deserialize event bytes
  • :batch-size - Number of events to read in each batch (default: 64)
Create an enhanced ring buffer consumer with memory mapping and epoll

Options:
- :map - The ring buffer map (required)
- :callback - Function called for each event (receives event data)
- :deserializer - Optional function to deserialize event bytes
- :batch-size - Number of events to read in each batch (default: 64)
sourceraw docstring

create-sampling-consumerclj

(create-sampling-consumer {:keys [map callback sample-rate deserializer]
                           :or {sample-rate 0.1 deserializer identity}})

Create a consumer that samples events at a configurable rate.

Useful for high-volume event streams where processing every event is not necessary (e.g., statistical sampling, debugging).

Options:

  • :map - The ring buffer map (required)
  • :callback - Function called for sampled events
  • :sample-rate - Fraction of events to sample (0.0 to 1.0, default: 0.1 = 10%)
  • :deserializer - Optional function to deserialize event bytes

Note: Uses the backpressure consumer internally with a sampling filter.

Create a consumer that samples events at a configurable rate.

Useful for high-volume event streams where processing every event
is not necessary (e.g., statistical sampling, debugging).

Options:
- :map - The ring buffer map (required)
- :callback - Function called for sampled events
- :sample-rate - Fraction of events to sample (0.0 to 1.0, default: 0.1 = 10%)
- :deserializer - Optional function to deserialize event bytes

Note: Uses the backpressure consumer internally with a sampling filter.
sourceraw docstring

get-backpressure-statsclj

(get-backpressure-stats consumer)

Get detailed statistics from the backpressure consumer.

Returns a map with:

  • :events-read - Total events read from ring buffer
  • :events-processed - Total events successfully processed
  • :events-dropped - Total events dropped due to backpressure
  • :queue-full-count - Number of times queue was full
  • :batches-read - Number of batches read
  • :errors - Number of errors encountered
  • :current-queue-size - Current number of events in queue
  • :max-queue-size - Maximum queue size reached
  • :drop-rate - Percentage of events dropped
  • :events-per-second - Average events processed per second
  • :uptime-ms - Consumer uptime in milliseconds
Get detailed statistics from the backpressure consumer.

Returns a map with:
- :events-read - Total events read from ring buffer
- :events-processed - Total events successfully processed
- :events-dropped - Total events dropped due to backpressure
- :queue-full-count - Number of times queue was full
- :batches-read - Number of batches read
- :errors - Number of errors encountered
- :current-queue-size - Current number of events in queue
- :max-queue-size - Maximum queue size reached
- :drop-rate - Percentage of events dropped
- :events-per-second - Average events processed per second
- :uptime-ms - Consumer uptime in milliseconds
sourceraw docstring

get-consumer-statsclj

(get-consumer-stats consumer)

Get statistics from ring buffer consumer

Returns map with:

  • :events-read - Total events read from ring buffer
  • :events-processed - Total events successfully processed
  • :batches-read - Number of batches read
  • :errors - Number of errors encountered
  • :events-per-second - Average events per second
  • :uptime-ms - Consumer uptime in milliseconds
Get statistics from ring buffer consumer

Returns map with:
- :events-read - Total events read from ring buffer
- :events-processed - Total events successfully processed
- :batches-read - Number of batches read
- :errors - Number of errors encountered
- :events-per-second - Average events per second
- :uptime-ms - Consumer uptime in milliseconds
sourceraw docstring

make-event-handlerclj

(make-event-handler &
                    {:keys [parser filter transform handler]
                     :or {parser identity
                          filter (constantly true)
                          transform identity
                          handler println}})

Create an event handler with deserialization and filtering

Parameters:

  • :parser - Parser function (from make-event-parser)
  • :filter - Optional predicate to filter events
  • :transform - Optional transformation function
  • :handler - Final handler function

Example: (make-event-handler :parser (make-event-parser [:u32 :u64]) :filter (fn [[pid ts]] (> pid 1000)) :transform (fn [[pid ts]] {:pid pid :timestamp ts}) :handler println)

Create an event handler with deserialization and filtering

Parameters:
- :parser - Parser function (from make-event-parser)
- :filter - Optional predicate to filter events
- :transform - Optional transformation function
- :handler - Final handler function

Example:
  (make-event-handler
    :parser (make-event-parser [:u32 :u64])
    :filter (fn [[pid ts]] (> pid 1000))
    :transform (fn [[pid ts]] {:pid pid :timestamp ts})
    :handler println)
sourceraw docstring

make-event-parserclj

(make-event-parser spec)

Create an event parser from a struct spec

Example: (def parse-event (make-event-parser [:u32 :u64 :u32])) (parse-event event-bytes) ;; => [123 456789 42]

Create an event parser from a struct spec

Example:
  (def parse-event (make-event-parser [:u32 :u64 :u32]))
  (parse-event event-bytes)
  ;; => [123 456789 42]
sourceraw docstring

make-event-serializerclj

(make-event-serializer spec)

Create an event serializer from a struct spec

Example: (def serialize-event (make-event-serializer [:u32 :u64 :u32])) (serialize-event [123 456789 42]) ;; => byte-array

Create an event serializer from a struct spec

Example:
  (def serialize-event (make-event-serializer [:u32 :u64 :u32]))
  (serialize-event [123 456789 42])
  ;; => byte-array
sourceraw docstring

map-ringbufclj

(map-ringbuf ringbuf-map)

Memory-map a ring buffer map

Parameters:

  • ringbuf-map: BPF ring buffer map

Returns RingBufLayout with memory segments for consumer, producer, and data

Memory-map a ring buffer map

Parameters:
- ringbuf-map: BPF ring buffer map

Returns RingBufLayout with memory segments for consumer, producer, and data
sourceraw docstring

peek-ringbuf-eventsclj

(peek-ringbuf-events ringbuf-map
                     &
                     {:keys [max-events deserializer]
                      :or {max-events 10 deserializer identity}})

Peek at available events without consuming them

Parameters:

  • ringbuf-map: Ring buffer map
  • :max-events - Maximum events to peek (default: 10)
  • :deserializer - Function to deserialize event bytes

Returns vector of events (without removing from ring buffer)

Peek at available events without consuming them

Parameters:
- ringbuf-map: Ring buffer map
- :max-events - Maximum events to peek (default: 10)
- :deserializer - Function to deserialize event bytes

Returns vector of events (without removing from ring buffer)
sourceraw docstring

process-eventsclj

(process-events
  ringbuf-map
  callback
  &
  {:keys [max-events timeout-ms deserializer]
   :or {max-events Integer/MAX_VALUE timeout-ms 1000 deserializer identity}})

Process events from a ring buffer synchronously

Parameters:

  • ringbuf-map: Ring buffer map
  • callback: Function called for each event
  • :max-events - Maximum number of events to process (default: all available)
  • :timeout-ms - Timeout in milliseconds (default: 1000)
  • :deserializer - Function to deserialize event bytes

Returns number of events processed

Process events from a ring buffer synchronously

Parameters:
- ringbuf-map: Ring buffer map
- callback: Function called for each event
- :max-events - Maximum number of events to process (default: all available)
- :timeout-ms - Timeout in milliseconds (default: 1000)
- :deserializer - Function to deserialize event bytes

Returns number of events processed
sourceraw docstring

read-ringbuf-eventsclj

(read-ringbuf-events layout
                     &
                     {:keys [max-events] :or {max-events Integer/MAX_VALUE}})

Read multiple events from ring buffer

Parameters:

  • layout: RingBufLayout
  • max-events: Maximum number of events to read (default: all available)

Returns vector of event byte arrays

Read multiple events from ring buffer

Parameters:
- layout: RingBufLayout
- max-events: Maximum number of events to read (default: all available)

Returns vector of event byte arrays
sourceraw docstring

start-backpressure-consumerclj

(start-backpressure-consumer consumer)

Start the backpressure consumer.

Creates two threads:

  1. Reader thread: Reads events from ring buffer and queues them
  2. Processor thread: Processes events from the queue

This separation allows the reader to keep up with the kernel while the processor handles potentially slow callbacks.

Returns the updated consumer with threads started.

Start the backpressure consumer.

Creates two threads:
1. Reader thread: Reads events from ring buffer and queues them
2. Processor thread: Processes events from the queue

This separation allows the reader to keep up with the kernel while
the processor handles potentially slow callbacks.

Returns the updated consumer with threads started.
sourceraw docstring

start-ringbuf-consumerclj

(start-ringbuf-consumer consumer)

Start consuming events from ring buffer with epoll-based notification

Start consuming events from ring buffer with epoll-based notification
sourceraw docstring

stop-backpressure-consumerclj

(stop-backpressure-consumer consumer)

Stop the backpressure consumer and clean up resources.

Waits for the processor to drain remaining queued events (up to 5 seconds) before forcefully terminating.

Returns the consumer with threads stopped.

Stop the backpressure consumer and clean up resources.

Waits for the processor to drain remaining queued events (up to 5 seconds)
before forcefully terminating.

Returns the consumer with threads stopped.
sourceraw docstring

stop-ringbuf-consumerclj

(stop-ringbuf-consumer consumer)

Stop consuming events from ring buffer

Stop consuming events from ring buffer
sourceraw docstring

unmap-ringbufclj

(unmap-ringbuf layout)

Unmap a ring buffer

Parameters:

  • layout: RingBufLayout from map-ringbuf
Unmap a ring buffer

Parameters:
- layout: RingBufLayout from map-ringbuf
sourceraw docstring

with-backpressure-consumercljmacro

(with-backpressure-consumer [binding consumer-spec] & body)

Create and manage a backpressure consumer with automatic cleanup.

Example: (with-backpressure-consumer [consumer {:map ringbuf-map :callback #(println %) :max-pending 5000}] (Thread/sleep 10000) (println "Stats:" (get-backpressure-stats consumer)) (println "Healthy?" (backpressure-healthy? consumer)))

Create and manage a backpressure consumer with automatic cleanup.

Example:
  (with-backpressure-consumer [consumer {:map ringbuf-map
                                         :callback #(println %)
                                         :max-pending 5000}]
    (Thread/sleep 10000)
    (println "Stats:" (get-backpressure-stats consumer))
    (println "Healthy?" (backpressure-healthy? consumer)))
sourceraw docstring

with-ringbuf-consumercljmacro

(with-ringbuf-consumer [binding consumer-spec] & body)

Create and manage a ring buffer consumer with automatic cleanup

Example: (with-ringbuf-consumer [consumer {:map ringbuf-map :callback #(println %) :deserializer parse-event}] (Thread/sleep 5000) (println "Stats:" (get-consumer-stats consumer)))

Create and manage a ring buffer consumer with automatic cleanup

Example:
  (with-ringbuf-consumer [consumer {:map ringbuf-map
                                    :callback #(println %)
                                    :deserializer parse-event}]
    (Thread/sleep 5000)
    (println "Stats:" (get-consumer-stats consumer)))
sourceraw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close