Kafka producers.
This API can also be used by mock producers (cf. dvlopt.kafka.out.mock namespace).
Kafka producers. This API can also be used by mock producers (cf. dvlopt.kafka.out.mock namespace).
(close producer)
(close producer options)
Closes the producer and releases all associated resources.
A map of options may be given :
:dvlopt.kafka/timeout
Blocks until all requests finish or the given timeout is reached, failing unsent and unacked records immediately.
Cf. dvlopt.kafka
for description of time intervals
Closes the producer and releases all associated resources. A map of options may be given : :dvlopt.kafka/timeout Blocks until all requests finish or the given timeout is reached, failing unsent and unacked records immediately. Cf. `dvlopt.kafka` for description of time intervals
(flush producer)
Flushes the producer.
Sends all buffered messages immediately, even if "linger.ms" (producer configuration) is greater than 0, and blocks until completion. Other threads can continue sending messages but no garantee is made they will be part of the current flush.
Flushes the producer. Sends all buffered messages immediately, even if "linger.ms" (producer configuration) is greater than 0, and blocks until completion. Other threads can continue sending messages but no garantee is made they will be part of the current flush.
(metrics producer)
Requests metrics about this producer.
Returns a map of metric group name -> (map of metric name -> map) containing :
:dvlopt.kafka/description (when available) String description for human consumption.
:dvlopt.kafka/properties Map of keywords to strings, additional arbitrary key-values.
:dvlopt.kafka/floating-value Numerical value.
Requests metrics about this producer. Returns a map of metric group name -> (map of metric name -> map) containing : :dvlopt.kafka/description (when available) String description for human consumption. :dvlopt.kafka/properties Map of keywords to strings, additional arbitrary key-values. :dvlopt.kafka/floating-value Numerical value.
(partitions producer topic)
Requests a list of partitions for a given topic.
<!> Blocks forever if the topic does not exist and dynamic creation has been disabled.
Returns a list of maps containing :
:dvlopt.kafka/leader-node (may be absent)
Cf. dvlopt.kafka
for descriptions of Kafka nodes.
:dvlopt.kafka/partition Partition number.
:dvlopt.kafka/replica-nodes
Cf. dvlopt.kafka
for descriptions of Kafka nodes.
:dvlopt.kafka/topic Topic name.
Requests a list of partitions for a given topic. <!> Blocks forever if the topic does not exist and dynamic creation has been disabled. Returns a list of maps containing : :dvlopt.kafka/leader-node (may be absent) Cf. `dvlopt.kafka` for descriptions of Kafka nodes. :dvlopt.kafka/partition Partition number. :dvlopt.kafka/replica-nodes Cf. `dvlopt.kafka` for descriptions of Kafka nodes. :dvlopt.kafka/topic Topic name.
(producer)
(producer options)
Builds a Kafka producer.
Producers are thread-safe and it is efficient to share one between multiple threads.
A map of options may be given :
::configuration Map of Kafka producer properties. Cf. https://kafka.apache.org/documentation/#producerconfigs
:dvlopt.kafka/nodes List of [host port].
:dvlopt.kafka/serializer.key
:dvlopt.kafka/serializer.value
Cf. dvlopt.kafka
for description of serializers.
Ex. (producer {::configuration {"client.id" "my_id"} :dvlopt.kafka/nodes [["some_host" 9092]] :dvlopt.kafka/serializer.key :string :dvlopt.kafka/serializer.value (fn [data _metadata] (some-> data nippy/freeze))})
Builds a Kafka producer. Producers are thread-safe and it is efficient to share one between multiple threads. A map of options may be given : ::configuration Map of Kafka producer properties. Cf. https://kafka.apache.org/documentation/#producerconfigs :dvlopt.kafka/nodes List of [host port]. :dvlopt.kafka/serializer.key :dvlopt.kafka/serializer.value Cf. `dvlopt.kafka` for description of serializers. Ex. (producer {::configuration {"client.id" "my_id"} :dvlopt.kafka/nodes [["some_host" 9092]] :dvlopt.kafka/serializer.key :string :dvlopt.kafka/serializer.value (fn [data _metadata] (some-> data nippy/freeze))})
(send producer record)
(send producer record callback)
In normal mode, asynchronously sends a record to Kafka via a producer and calls the optional callback on acknowledgment or error. The callback needs to accept 2 arguments : an exception in case of failure and metadata in case of success. It will be executed on the IO thread of the producer, so it should be fast or pass data to another thread.
This function returns a future when synchronous behaviour is needed. Derefing the future will throw if a failure occured or it will provide the same metadata passed to the optional callback.
A record is a map possibly containing :
:dvlopt.kafka/headers
:dvlopt.kafka/key
:dvlopt.kafka/partition
:dvlopt.kafka/timestamp
:dvlopt.kafka/topic (mandatory)
:dvlopt.kafka/value
Cf. dvlopt.kafka
section "Records"
If the partition is missing, it is automatically selected based on the hash of the key.
Metadata is a map containing :dvlopt.kafka/topic and when available :
:dvlopt.kafka/offset :dvlopt.kafka/partition :dvlopt.kafka/timestamp
In transactional mode, there is no need for callbacks or checking the futures because a call to trx-commit
will
throw the exception from the last failed send. When such a failure occurs, the easiest way to deal with it is simply
to restart the transactional producer.
Ex. (send producer {:dvlopt.kafka/key "some-key" :dvlopt.kafka/topic "my-topic" :dvlopt.kafka/value 42} (fn callback [exception meta] (when-not exception (println :committed meta))))
In normal mode, asynchronously sends a record to Kafka via a producer and calls the optional callback on acknowledgment or error. The callback needs to accept 2 arguments : an exception in case of failure and metadata in case of success. It will be executed on the IO thread of the producer, so it should be fast or pass data to another thread. This function returns a future when synchronous behaviour is needed. Derefing the future will throw if a failure occured or it will provide the same metadata passed to the optional callback. A record is a map possibly containing : :dvlopt.kafka/headers :dvlopt.kafka/key :dvlopt.kafka/partition :dvlopt.kafka/timestamp :dvlopt.kafka/topic (mandatory) :dvlopt.kafka/value Cf. `dvlopt.kafka` section "Records" If the partition is missing, it is automatically selected based on the hash of the key. Metadata is a map containing :dvlopt.kafka/topic and when available : :dvlopt.kafka/offset :dvlopt.kafka/partition :dvlopt.kafka/timestamp In transactional mode, there is no need for callbacks or checking the futures because a call to `trx-commit` will throw the exception from the last failed send. When such a failure occurs, the easiest way to deal with it is simply to restart the transactional producer. Ex. (send producer {:dvlopt.kafka/key "some-key" :dvlopt.kafka/topic "my-topic" :dvlopt.kafka/value 42} (fn callback [exception meta] (when-not exception (println :committed meta))))
(trx-abort producer)
Aborts the ongoing transaction.
Aborts the ongoing transaction.
(trx-begin producer)
Must be called exactly once at the start of each new transaction.
<!> trx-init
must be called before any transaction. <!>
Must be called exactly once at the start of each new transaction. <!> `trx-init` must be called before any transaction. <!>
(trx-commit producer)
Requests a commit of the ongoing transaction.
A transaction succeeds only if every step succeeds.
If any record commit hit an irrecoverable error, this function will rethrow that exception and the transaction will not be committed.
Requests a commit of the ongoing transaction. A transaction succeeds only if every step succeeds. If any record commit hit an irrecoverable error, this function will rethrow that exception and the transaction will not be committed.
(trx-init producer)
The producer configuration "transactional.id" needs to be set.
Based on this id, ensures any previous transaction committed by a previous instance is completed or any pending transaction is aborted. Further more, it prepares the producer for future ones.
This function needs to be called exactly once prior to doing any new transactions.
Of course, brokers need to support transactions.
The producer configuration "transactional.id" needs to be set. Based on this id, ensures any previous transaction committed by a previous instance is completed or any pending transaction is aborted. Further more, it prepares the producer for future ones. This function needs to be called exactly once prior to doing any new transactions. Of course, brokers need to support transactions.
(trx-offsets producer consumer-group topic-partition->offset)
Adds offsets for a consumer group id as part of the ongoing transaction.
Very useful for a consume-transform-produce use case.
The consumer polls records without committing its offsets ("enable.auto.commit" must be set to false). After some computation, while in a transaction, the producer publishes results as well as the offsets (ie. for each topic partition, the offset of the last processed record + 1). This garantees exacly once semantics.
Ex. ;; commits offset 65 for "some-topic" partition 0
(trx-offsets producer
"my-consumer-group"
{["some-topic" 0] 65})
Adds offsets for a consumer group id as part of the ongoing transaction. Very useful for a consume-transform-produce use case. The consumer polls records without committing its offsets ("enable.auto.commit" must be set to false). After some computation, while in a transaction, the producer publishes results as well as the offsets (ie. for each topic partition, the offset of the last processed record + 1). This garantees exacly once semantics. Ex. ;; commits offset 65 for "some-topic" partition 0 (trx-offsets producer "my-consumer-group" {["some-topic" 0] 65})
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close