Enhanced BPF event reading from ring buffers with memory mapping and epoll support
Enhanced BPF event reading from ring buffers with memory mapping and epoll support
(backpressure-healthy? consumer
&
{:keys [max-drop-rate max-queue-utilization]
:or {max-drop-rate 5.0 max-queue-utilization 80.0}})Check if the backpressure consumer is healthy.
Returns true if:
Options:
Check if the backpressure consumer is healthy. Returns true if: - Drop rate is below threshold (default 5%) - Queue utilization is below threshold (default 80%) Options: - :max-drop-rate - Maximum acceptable drop rate percentage (default: 5.0) - :max-queue-utilization - Maximum acceptable queue fill percentage (default: 80.0)
(create-backpressure-consumer
{:keys [map callback deserializer max-pending drop-handler batch-size]
:or
{deserializer identity max-pending 10000 drop-handler nil batch-size 64}})Create a ring buffer consumer with backpressure support.
When the processing queue fills up, new events are dropped rather than causing unbounded memory growth. Dropped events can be tracked via statistics or handled via an optional drop handler.
Options:
Returns a BackpressureConsumer record.
Example: (def consumer (create-backpressure-consumer {:map ringbuf-map :callback process-event :deserializer parse-event :max-pending 5000 :drop-handler (fn [_] (log/warn "Event dropped"))}))
Create a ring buffer consumer with backpressure support.
When the processing queue fills up, new events are dropped rather than
causing unbounded memory growth. Dropped events can be tracked via
statistics or handled via an optional drop handler.
Options:
- :map - The ring buffer map (required)
- :callback - Function called for each event (receives deserialized event)
- :deserializer - Optional function to deserialize event bytes (default: identity)
- :max-pending - Maximum events in processing queue before dropping (default: 10000)
- :drop-handler - Optional function called when events are dropped (receives event bytes)
- :batch-size - Number of events to read in each batch (default: 64)
Returns a BackpressureConsumer record.
Example:
(def consumer
(create-backpressure-consumer
{:map ringbuf-map
:callback process-event
:deserializer parse-event
:max-pending 5000
:drop-handler (fn [_] (log/warn "Event dropped"))}))(create-ringbuf-consumer {:keys [map callback deserializer batch-size]
:or {deserializer identity batch-size 64}})Create an enhanced ring buffer consumer with memory mapping and epoll
Options:
Create an enhanced ring buffer consumer with memory mapping and epoll Options: - :map - The ring buffer map (required) - :callback - Function called for each event (receives event data) - :deserializer - Optional function to deserialize event bytes - :batch-size - Number of events to read in each batch (default: 64)
(create-sampling-consumer {:keys [map callback sample-rate deserializer]
:or {sample-rate 0.1 deserializer identity}})Create a consumer that samples events at a configurable rate.
Useful for high-volume event streams where processing every event is not necessary (e.g., statistical sampling, debugging).
Options:
Note: Uses the backpressure consumer internally with a sampling filter.
Create a consumer that samples events at a configurable rate. Useful for high-volume event streams where processing every event is not necessary (e.g., statistical sampling, debugging). Options: - :map - The ring buffer map (required) - :callback - Function called for sampled events - :sample-rate - Fraction of events to sample (0.0 to 1.0, default: 0.1 = 10%) - :deserializer - Optional function to deserialize event bytes Note: Uses the backpressure consumer internally with a sampling filter.
(get-backpressure-stats consumer)Get detailed statistics from the backpressure consumer.
Returns a map with:
Get detailed statistics from the backpressure consumer. Returns a map with: - :events-read - Total events read from ring buffer - :events-processed - Total events successfully processed - :events-dropped - Total events dropped due to backpressure - :queue-full-count - Number of times queue was full - :batches-read - Number of batches read - :errors - Number of errors encountered - :current-queue-size - Current number of events in queue - :max-queue-size - Maximum queue size reached - :drop-rate - Percentage of events dropped - :events-per-second - Average events processed per second - :uptime-ms - Consumer uptime in milliseconds
(get-consumer-stats consumer)Get statistics from ring buffer consumer
Returns map with:
Get statistics from ring buffer consumer Returns map with: - :events-read - Total events read from ring buffer - :events-processed - Total events successfully processed - :batches-read - Number of batches read - :errors - Number of errors encountered - :events-per-second - Average events per second - :uptime-ms - Consumer uptime in milliseconds
(make-event-handler &
{:keys [parser filter transform handler]
:or {parser identity
filter (constantly true)
transform identity
handler println}})Create an event handler with deserialization and filtering
Parameters:
Example: (make-event-handler :parser (make-event-parser [:u32 :u64]) :filter (fn [[pid ts]] (> pid 1000)) :transform (fn [[pid ts]] {:pid pid :timestamp ts}) :handler println)
Create an event handler with deserialization and filtering
Parameters:
- :parser - Parser function (from make-event-parser)
- :filter - Optional predicate to filter events
- :transform - Optional transformation function
- :handler - Final handler function
Example:
(make-event-handler
:parser (make-event-parser [:u32 :u64])
:filter (fn [[pid ts]] (> pid 1000))
:transform (fn [[pid ts]] {:pid pid :timestamp ts})
:handler println)(make-event-parser spec)Create an event parser from a struct spec
Example: (def parse-event (make-event-parser [:u32 :u64 :u32])) (parse-event event-bytes) ;; => [123 456789 42]
Create an event parser from a struct spec Example: (def parse-event (make-event-parser [:u32 :u64 :u32])) (parse-event event-bytes) ;; => [123 456789 42]
(make-event-serializer spec)Create an event serializer from a struct spec
Example: (def serialize-event (make-event-serializer [:u32 :u64 :u32])) (serialize-event [123 456789 42]) ;; => byte-array
Create an event serializer from a struct spec Example: (def serialize-event (make-event-serializer [:u32 :u64 :u32])) (serialize-event [123 456789 42]) ;; => byte-array
(map-ringbuf ringbuf-map)Memory-map a ring buffer map
Parameters:
Returns RingBufLayout with memory segments for consumer, producer, and data
Memory-map a ring buffer map Parameters: - ringbuf-map: BPF ring buffer map Returns RingBufLayout with memory segments for consumer, producer, and data
(peek-ringbuf-events ringbuf-map
&
{:keys [max-events deserializer]
:or {max-events 10 deserializer identity}})Peek at available events without consuming them
Parameters:
Returns vector of events (without removing from ring buffer)
Peek at available events without consuming them Parameters: - ringbuf-map: Ring buffer map - :max-events - Maximum events to peek (default: 10) - :deserializer - Function to deserialize event bytes Returns vector of events (without removing from ring buffer)
(process-events
ringbuf-map
callback
&
{:keys [max-events timeout-ms deserializer]
:or {max-events Integer/MAX_VALUE timeout-ms 1000 deserializer identity}})Process events from a ring buffer synchronously
Parameters:
Returns number of events processed
Process events from a ring buffer synchronously Parameters: - ringbuf-map: Ring buffer map - callback: Function called for each event - :max-events - Maximum number of events to process (default: all available) - :timeout-ms - Timeout in milliseconds (default: 1000) - :deserializer - Function to deserialize event bytes Returns number of events processed
(read-ringbuf-events layout
&
{:keys [max-events] :or {max-events Integer/MAX_VALUE}})Read multiple events from ring buffer
Parameters:
Returns vector of event byte arrays
Read multiple events from ring buffer Parameters: - layout: RingBufLayout - max-events: Maximum number of events to read (default: all available) Returns vector of event byte arrays
(start-backpressure-consumer consumer)Start the backpressure consumer.
Creates two threads:
This separation allows the reader to keep up with the kernel while the processor handles potentially slow callbacks.
Returns the updated consumer with threads started.
Start the backpressure consumer. Creates two threads: 1. Reader thread: Reads events from ring buffer and queues them 2. Processor thread: Processes events from the queue This separation allows the reader to keep up with the kernel while the processor handles potentially slow callbacks. Returns the updated consumer with threads started.
(start-ringbuf-consumer consumer)Start consuming events from ring buffer with epoll-based notification
Start consuming events from ring buffer with epoll-based notification
(stop-backpressure-consumer consumer)Stop the backpressure consumer and clean up resources.
Waits for the processor to drain remaining queued events (up to 5 seconds) before forcefully terminating.
Returns the consumer with threads stopped.
Stop the backpressure consumer and clean up resources. Waits for the processor to drain remaining queued events (up to 5 seconds) before forcefully terminating. Returns the consumer with threads stopped.
(stop-ringbuf-consumer consumer)Stop consuming events from ring buffer
Stop consuming events from ring buffer
(unmap-ringbuf layout)Unmap a ring buffer
Parameters:
Unmap a ring buffer Parameters: - layout: RingBufLayout from map-ringbuf
(with-backpressure-consumer [binding consumer-spec] & body)Create and manage a backpressure consumer with automatic cleanup.
Example: (with-backpressure-consumer [consumer {:map ringbuf-map :callback #(println %) :max-pending 5000}] (Thread/sleep 10000) (println "Stats:" (get-backpressure-stats consumer)) (println "Healthy?" (backpressure-healthy? consumer)))
Create and manage a backpressure consumer with automatic cleanup.
Example:
(with-backpressure-consumer [consumer {:map ringbuf-map
:callback #(println %)
:max-pending 5000}]
(Thread/sleep 10000)
(println "Stats:" (get-backpressure-stats consumer))
(println "Healthy?" (backpressure-healthy? consumer)))(with-ringbuf-consumer [binding consumer-spec] & body)Create and manage a ring buffer consumer with automatic cleanup
Example: (with-ringbuf-consumer [consumer {:map ringbuf-map :callback #(println %) :deserializer parse-event}] (Thread/sleep 5000) (println "Stats:" (get-consumer-stats consumer)))
Create and manage a ring buffer consumer with automatic cleanup
Example:
(with-ringbuf-consumer [consumer {:map ringbuf-map
:callback #(println %)
:deserializer parse-event}]
(Thread/sleep 5000)
(println "Stats:" (get-consumer-stats consumer)))cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |