Liking cljdoc? Tell your friends :D

dvlopt.kafka.out

Kafka producers.

This API can also be used by mock producers (cf. dvlopt.kafka.out.mock namespace).

Kafka producers.

This API can also be used by mock producers (cf. dvlopt.kafka.out.mock namespace).
raw docstring

closeclj

(close producer)
(close producer options)

Closes the producer and releases all associated resources.

A map of options may be given :

:dvlopt.kafka/timeout Blocks until all requests finish or the given timeout is reached, failing unsent and unacked records immediately. Cf. dvlopt.kafka for description of time intervals

Closes the producer and releases all associated resources.

A map of options may be given :

  :dvlopt.kafka/timeout
   Blocks until all requests finish or the given timeout is reached, failing unsent and unacked records immediately.
   Cf. `dvlopt.kafka` for description of time intervals
sourceraw docstring

flushclj

(flush producer)

Flushes the producer.

Sends all buffered messages immediately, even if "linger.ms" (producer configuration) is greater than 0, and blocks until completion. Other threads can continue sending messages but no garantee is made they will be part of the current flush.

Flushes the producer.

Sends all buffered messages immediately, even if "linger.ms" (producer configuration) is greater than 0, and blocks
until completion. Other threads can continue sending messages but no garantee is made they will be part of the current
flush.
sourceraw docstring

metricsclj

(metrics producer)

Requests metrics about this producer.

Returns a map of metric group name -> (map of metric name -> map) containing :

:dvlopt.kafka/description (when available) String description for human consumption.

:dvlopt.kafka/properties Map of keywords to strings, additional arbitrary key-values.

:dvlopt.kafka/floating-value Numerical value.

Requests metrics about this producer.

Returns a map of metric group name -> (map of metric name -> map) containing :

  :dvlopt.kafka/description (when available)
   String description for human consumption.

  :dvlopt.kafka/properties
   Map of keywords to strings, additional arbitrary key-values.

  :dvlopt.kafka/floating-value
   Numerical value.
sourceraw docstring

partitionsclj

(partitions producer topic)

Requests a list of partitions for a given topic.

<!> Blocks forever if the topic does not exist and dynamic creation has been disabled.

Returns a list of maps containing :

:dvlopt.kafka/leader-node (may be absent) Cf. dvlopt.kafka for descriptions of Kafka nodes.

:dvlopt.kafka/partition Partition number.

:dvlopt.kafka/replica-nodes Cf. dvlopt.kafka for descriptions of Kafka nodes.

:dvlopt.kafka/topic Topic name.

Requests a list of partitions for a given topic.

<!> Blocks forever if the topic does not exist and dynamic creation has been disabled.


Returns a list of maps containing :

  :dvlopt.kafka/leader-node (may be absent)
   Cf. `dvlopt.kafka` for descriptions of Kafka nodes.

  :dvlopt.kafka/partition
   Partition number.

  :dvlopt.kafka/replica-nodes
   Cf. `dvlopt.kafka` for descriptions of Kafka nodes.

  :dvlopt.kafka/topic
   Topic name.
sourceraw docstring

producerclj

(producer)
(producer options)

Builds a Kafka producer.

Producers are thread-safe and it is efficient to share one between multiple threads.

A map of options may be given :

::configuration Map of Kafka producer properties. Cf. https://kafka.apache.org/documentation/#producerconfigs

:dvlopt.kafka/nodes List of [host port].

:dvlopt.kafka/serializer.key :dvlopt.kafka/serializer.value Cf. dvlopt.kafka for description of serializers.

Ex. (producer {::configuration {"client.id" "my_id"} :dvlopt.kafka/nodes [["some_host" 9092]] :dvlopt.kafka/serializer.key :string :dvlopt.kafka/serializer.value (fn [data _metadata] (some-> data nippy/freeze))})

Builds a Kafka producer.

Producers are thread-safe and it is efficient to share one between multiple threads.

A map of options may be given :

  ::configuration
    Map of Kafka producer properties.
    Cf. https://kafka.apache.org/documentation/#producerconfigs

  :dvlopt.kafka/nodes
    List of [host port].

  :dvlopt.kafka/serializer.key
  :dvlopt.kafka/serializer.value
   Cf. `dvlopt.kafka` for description of serializers.


Ex. (producer {::configuration                {"client.id" "my_id"}
               :dvlopt.kafka/nodes            [["some_host" 9092]]
               :dvlopt.kafka/serializer.key   :string
               :dvlopt.kafka/serializer.value (fn [data _metadata]
                                                (some-> data
                                                        nippy/freeze))})
sourceraw docstring

sendclj

(send producer record)
(send producer record callback)

In normal mode, asynchronously sends a record to Kafka via a producer and calls the optional callback on acknowledgment or error. The callback needs to accept 2 arguments : an exception in case of failure and metadata in case of success. It will be executed on the IO thread of the producer, so it should be fast or pass data to another thread.

This function returns a future when synchronous behaviour is needed. Derefing the future will throw if a failure occured or it will provide the same metadata passed to the optional callback.

A record is a map possibly containing :

:dvlopt.kafka/headers :dvlopt.kafka/key :dvlopt.kafka/partition :dvlopt.kafka/timestamp :dvlopt.kafka/topic (mandatory) :dvlopt.kafka/value Cf. dvlopt.kafka section "Records"

If the partition is missing, it is automatically selected based on the hash of the key.

Metadata is a map containing :dvlopt.kafka/topic and when available :

:dvlopt.kafka/offset :dvlopt.kafka/partition :dvlopt.kafka/timestamp

In transactional mode, there is no need for callbacks or checking the futures because a call to trx-commit will throw the exception from the last failed send. When such a failure occurs, the easiest way to deal with it is simply to restart the transactional producer.

Ex. (send producer {:dvlopt.kafka/key "some-key" :dvlopt.kafka/topic "my-topic" :dvlopt.kafka/value 42} (fn callback [exception meta] (when-not exception (println :committed meta))))

In normal mode, asynchronously sends a record to Kafka via a producer and calls the optional callback
on acknowledgment or error. The callback needs to accept 2 arguments : an exception in case of failure and
metadata in case of success. It will be executed on the IO thread of the producer, so it should be fast or
pass data to another thread.

This function returns a future when synchronous behaviour is needed. Derefing the future will throw if a failure
occured or it will provide the same metadata passed to the optional callback.

A record is a map possibly containing :

  :dvlopt.kafka/headers
  :dvlopt.kafka/key
  :dvlopt.kafka/partition
  :dvlopt.kafka/timestamp
  :dvlopt.kafka/topic     (mandatory)
  :dvlopt.kafka/value
   Cf. `dvlopt.kafka` section "Records"

If the partition is missing, it is automatically selected based on the hash of the key.

Metadata is a map containing :dvlopt.kafka/topic and when available :

  :dvlopt.kafka/offset
  :dvlopt.kafka/partition
  :dvlopt.kafka/timestamp

In transactional mode, there is no need for callbacks or checking the futures because a call to `trx-commit` will
throw the exception from the last failed send. When such a failure occurs, the easiest way to deal with it is simply
to restart the transactional producer.


Ex. (send producer
          {:dvlopt.kafka/key   "some-key"
           :dvlopt.kafka/topic "my-topic"
           :dvlopt.kafka/value 42}
          (fn callback [exception meta]
            (when-not exception
              (println :committed meta))))
sourceraw docstring

trx-abortclj

(trx-abort producer)

Aborts the ongoing transaction.

Aborts the ongoing transaction.
sourceraw docstring

trx-beginclj

(trx-begin producer)

Must be called exactly once at the start of each new transaction.

<!> trx-init must be called before any transaction. <!>

Must be called exactly once at the start of each new transaction.

<!> `trx-init` must be called before any transaction. <!>
sourceraw docstring

trx-commitclj

(trx-commit producer)

Requests a commit of the ongoing transaction.

A transaction succeeds only if every step succeeds.

If any record commit hit an irrecoverable error, this function will rethrow that exception and the transaction will not be committed.

Requests a commit of the ongoing transaction.

A transaction succeeds only if every step succeeds.

If any record commit hit an irrecoverable error, this function will rethrow that exception and the transaction
will not be committed.
sourceraw docstring

trx-initclj

(trx-init producer)

The producer configuration "transactional.id" needs to be set.

Based on this id, ensures any previous transaction committed by a previous instance is completed or any pending transaction is aborted. Further more, it prepares the producer for future ones.

This function needs to be called exactly once prior to doing any new transactions.

Of course, brokers need to support transactions.

The producer configuration "transactional.id" needs to be set.

Based on this id, ensures any previous transaction committed by a previous instance is completed or any pending
transaction is aborted. Further more, it prepares the producer for future ones.

This function needs to be called exactly once prior to doing any new transactions.

Of course, brokers need to support transactions.
sourceraw docstring

trx-offsetsclj

(trx-offsets producer consumer-group topic-partition->offset)

Adds offsets for a consumer group id as part of the ongoing transaction.

Very useful for a consume-transform-produce use case.

The consumer polls records without committing its offsets ("enable.auto.commit" must be set to false). After some computation, while in a transaction, the producer publishes results as well as the offsets (ie. for each topic partition, the offset of the last processed record + 1). This garantees exacly once semantics.

Ex. ;; commits offset 65 for "some-topic" partition 0

(trx-offsets producer
             "my-consumer-group"
             {["some-topic" 0] 65})
Adds offsets for a consumer group id as part of the ongoing transaction.

Very useful for a consume-transform-produce use case.

The consumer polls records without committing its offsets ("enable.auto.commit" must be set to false). After some computation,
while in a transaction, the producer publishes results as well as the offsets (ie. for each topic partition, the offset of the
last processed record + 1). This garantees exacly once semantics.


Ex. ;; commits offset 65 for "some-topic" partition 0

    (trx-offsets producer
                 "my-consumer-group"
                 {["some-topic" 0] 65})
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close