(close {{:keys [shutdown-flag activity-counter metadata-connector
scheduled-service retry-cache]}
:state
msg-ch :msg-ch
persist-delay-thread :persist-delay-thread
async-latch :async-latch})
Close all producers and channels created for the connected
Close all producers and channels created for the connected
(create-client-service brokers conf)
(create-connector bootstrap-brokers
{:keys [acks io-threads batch-num-messages
queue-buffering-max-ms default-replication-factor
default-partitions batch-fail-message-over-limit
batch-byte-limit topic-auto-create flush-on-write]
:or {queue-buffering-max-ms 500
acks 0
default-replication-factor 1
flush-on-write false
topic-auto-create true
default-partitions 2
batch-byte-limit 10485760
batch-num-messages 25
io-threads 10
batch-fail-message-over-limit true}
:as conf})
Creates a connector for sending to kafka All sends are asynchronous, unrecoverable errors are saved to a local retry cache and also notified to the producer-error-ch with data of type ErrorCtx
Creates a connector for sending to kafka All sends are asynchronous, unrecoverable errors are saved to a local retry cache and also notified to the producer-error-ch with data of type ErrorCtx
(create-topic-request {:keys [metadata-connector]} topic)
Send a create topic request to a random broker and return true, otherwise throw an exception connector == kafka-clj.metadata/connector
Send a create topic request to a random broker and return true, otherwise throw an exception connector == kafka-clj.metadata/connector
(get-metadata-error-ch connector)
(handle-async-topic-messages state topic msgs)
Send messages from the same topic to a producer
Send messages from the same topic to a producer
(kafka-response state send-cache resp)
Handles the response input stream for each producer. Takes the state and resp:ProduceResponse. If the error-code > 0 and a send-cache is in state, the messages are retrieved from the persist and if still in persist, the messages are sent using handle-async-topic-messages.
Handles the response input stream for each producer. Takes the state and resp:ProduceResponse. If the error-code > 0 and a send-cache is in state, the messages are retrieved from the persist and if still in persist, the messages are sent using handle-async-topic-messages.
(producer-error-ch connector)
(read-response state send-cache conn)
(select-partition-rc {:keys [metadata-connector]} topic)
Returns a partition-rc {:host :port} that has not been blacklisted otherwise nil
Returns a partition-rc {:host :port} that has not been blacklisted otherwise nil
(send-msg {:keys [state metadata-connector msg-ch] :as connector} topic bts)
Sends a message async and returns false if the connection is closed Any errors uncrecoverable errors shuold be readh from the producer-error-ch
Sends a message async and returns false if the connection is closed Any errors uncrecoverable errors shuold be readh from the producer-error-ch
(try-create-topic {:keys [state] :as connector} topic)
Return true if create-topic was called, otherwise an exception is thrown
Return true if create-topic was called, otherwise an exception is thrown
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close