Raw api for sending and reading kafka fetch offset and fetch messages requests, reading the fetch message is done by the java class kafka_clj.util.Fetch
Raw api for sending and reading kafka fetch offset and fetch messages requests, reading the fetch message is done by the java class kafka_clj.util.Fetch
(_read-offset-response in)
(get-unique-corr-id)
(send-recv-offset-request metadata-connector host-address topics)
Send an offset request and read the response
Send an offset request and read the response
(write-fecth-request-message
buff
topics
{:keys [max-wait-time min-bytes max-bytes]
:or {max-wait-time 1000 min-bytes 100 max-bytes 10485760}})
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] ReplicaId => int32 MaxWaitTime => int32 MinBytes => int32 TopicName => string Partition => int32 FetchOffset => int64 MaxBytes => int32
topics = [["<topic-name>" [{:partition <partition>, :offset <offset-to-fetch-from>}]]]
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] ReplicaId => int32 MaxWaitTime => int32 MinBytes => int32 TopicName => string Partition => int32 FetchOffset => int64 MaxBytes => int32 topics = [["<topic-name>" [{:partition <partition>, :offset <offset-to-fetch-from>}]]]
(write-fetch-request buff topics state)
topics = [["<topic-name>" [{:partition <partition>, :offset <offset-to-fetch-from>}]]]
topics = [["<topic-name>" [{:partition <partition>, :offset <offset-to-fetch-from>}]]]
(write-fetch-request-header buff
topics
{:keys [client-id] :or {client-id "1"} :as state})
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage ApiKey => int16 ApiVersion => int16 CorrelationId => int32 ClientId => string RequestMessage => FetchRequestMessage
topics = [["<topic-name>" [{:partition <partition>, :offset <offset-to-fetch-from>}]]]
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage ApiKey => int16 ApiVersion => int16 CorrelationId => int32 ClientId => string RequestMessage => FetchRequestMessage topics = [["<topic-name>" [{:partition <partition>, :offset <offset-to-fetch-from>}]]]
(write-offset-request buff
{:keys [correlation-id client-id]
:or {correlation-id 1 client-id "1"}
:as state})
(write-offset-request-message buff
{:keys [topics max-offsets]
:or {max-offsets 500}})
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close