Everything related to Kafka producers.
Everything related to Kafka producers.
(close producer)(close producer timeout-ms)Tries to close the producer cleanly.
Blocks until all sends are done or the optional timeout is elapsed.
@ producer Kafka producer.
@ timeout-ms Timeout in milliseconds.
=> nil
Throws
org.apache.kafka.common.errors
InterruptException
Whenf the thread is interrupted while blocked.
Tries to close the producer cleanly.
Blocks until all sends are done or the optional timeout is elapsed.
@ producer
Kafka producer.
@ timeout-ms
Timeout in milliseconds.
=> nil
Throws
org.apache.kafka.common.errors
InterruptException
Whenf the thread is interrupted while blocked.(commit producer record)(commit producer record callback)In normal mode, asynchronously commits a message to Kafka via a producer and calls the optional callback on acknowledgment or error.
In transactional mode, behaves synchronously and throws right away in case of failure. When a recoverable error occurs,
the user should call trx-abort. Otherwise, the producer should be closed as it enters a defunct state.
<!> Callbacks will be executed on the IO thread of the producer, so they should be fast.
@ producer Kafka producer.
@ record
Cf. milena.interop.java/producer-record
@ callback
Cf. milena.interop.java/callback
=> Future resolving to the metadata or throwing if an error occured.
Cf. milena.interop.clj/record-metadata
Possible exceptions are :
java.lang
IllegalStateException
When a 'transactional.id' has been configured and no transaction has been started.
org.apache.kafka.common.errors
AuthenticationException
When authentication fails.
InterruptException
When the thread is interrupted while blocked.
SerializationException
When the key or value are not valid objects given the configured serializers.
TimeoutException
When the time take for fetching metadata or allocating memory for the record has surpassed 'max.block.ms'.
KafkaException
Any other unrecoverable error.
Possibles exceptions in transactions :
org.apache.kafka.common.errors
ProducerFencedException (unrecoverable)
When another producer with the same transaction id is already active.
OutOfOrderSequenceException (unrecorable)
When the broker receives an unexpected sequence number from the producer which means that data may have been lost.
UnsupportedVersionException (unrecovarable)
When the transaction API is not supported by the broker.
UnsupportedForMessageFormatException (recoverable)
When the message format of the desination topic is not upgraded to 0.11.0.0.
Ex. (commit producer {:topic "my-topic" :key "some-key" :value 42} (fn callback [exception meta] (when-not exception (println :committed meta))))
In normal mode, asynchronously commits a message to Kafka via a producer and calls the optional callback
on acknowledgment or error.
In transactional mode, behaves synchronously and throws right away in case of failure. When a recoverable error occurs,
the user should call `trx-abort`. Otherwise, the producer should be closed as it enters a defunct state.
<!> Callbacks will be executed on the IO thread of the producer, so they should be fast.
@ producer
Kafka producer.
@ record
Cf. `milena.interop.java/producer-record`
@ callback
Cf. `milena.interop.java/callback`
=> Future resolving to the metadata or throwing if an error occured.
Cf. `milena.interop.clj/record-metadata`
Possible exceptions are :
java.lang
IllegalStateException
When a 'transactional.id' has been configured and no transaction has been started.
org.apache.kafka.common.errors
AuthenticationException
When authentication fails.
InterruptException
When the thread is interrupted while blocked.
SerializationException
When the key or value are not valid objects given the configured serializers.
TimeoutException
When the time take for fetching metadata or allocating memory for the record has surpassed 'max.block.ms'.
KafkaException
Any other unrecoverable error.
Possibles exceptions in transactions :
org.apache.kafka.common.errors
ProducerFencedException (unrecoverable)
When another producer with the same transaction id is already active.
OutOfOrderSequenceException (unrecorable)
When the broker receives an unexpected sequence number from the producer which means that data may have been lost.
UnsupportedVersionException (unrecovarable)
When the transaction API is not supported by the broker.
UnsupportedForMessageFormatException (recoverable)
When the message format of the desination topic is not upgraded to 0.11.0.0.
Ex. (commit producer
{:topic "my-topic"
:key "some-key"
:value 42}
(fn callback [exception meta]
(when-not exception
(println :committed meta))))(flush producer)Flushes the producer.
Sends all buffered messages immediately, even is 'linger.ms' is greater than 0, and blocks until completion. Other thread can continue sending messages but no garantee is made they will be part of the flush.
@ producer Kafka producer.
=> nil
Throws
org.apache.kafka.common.errors
InterruptException
When the thread is interrupted while flushing.
Flushes the producer.
Sends all buffered messages immediately, even is 'linger.ms' is greater than 0, and blocks until
completion. Other thread can continue sending messages but no garantee is made they will be part
of the flush.
@ producer
Kafka producer.
=> nil
Throws
org.apache.kafka.common.errors
InterruptException
When the thread is interrupted while flushing.(make)(make {:as opts
:keys [nodes config serializer serializer-key serializer-value]
:or {nodes [["localhost" 9092]]
serializer M.serialize/byte-array
serializer-key serializer
serializer-value serializer}})Builds a Kafka producer.
Producers are thread-safe and it is efficient to share one between multiple threads.
@ opts (nilable) {:nodes (nilable) List of [host port].
:config (nilable) Kafka configuration map. Cf. https://kafka.apache.org/documentation/#producerconfigs
:serializer (nilable)
Kafka serializer or fn eligable for becoming one.
Cf. milena.serialize
milena.serialize/make
:serializer-key (nilable)
Defaulting to ?serializer.
:serializer-value (nilable)
Defaulting to ?deserializer.}
=> org.apache.kafka.clients.producer.KafkaProducer
Ex. (make {:nodes [["some_host" 9092]] :config {:client.id "my_id"} :serializer-key milena.serialize/string :serializer-value (fn [_ data] (nippy/freeze data))})
Builds a Kafka producer.
Producers are thread-safe and it is efficient to share one between multiple threads.
@ opts (nilable)
{:nodes (nilable)
List of [host port].
:config (nilable)
Kafka configuration map.
Cf. https://kafka.apache.org/documentation/#producerconfigs
:serializer (nilable)
Kafka serializer or fn eligable for becoming one.
Cf. `milena.serialize`
`milena.serialize/make`
:serializer-key (nilable)
Defaulting to `?serializer`.
:serializer-value (nilable)
Defaulting to `?deserializer`.}
=> org.apache.kafka.clients.producer.KafkaProducer
Ex. (make {:nodes [["some_host" 9092]]
:config {:client.id "my_id"}
:serializer-key milena.serialize/string
:serializer-value (fn [_ data]
(nippy/freeze data))})(metrics producer)Gets metrics about this producer.
@ producer Kafka producer.
=> Cf. milena.interop.clj/metrics
Gets metrics about this producer. @ producer Kafka producer. => Cf. `milena.interop.clj/metrics`
(partitions producer topic)Gets a list of partitions for a given topic.
<!> Blocks forever is the topic does not exist and dynamic creation has been disabled.
@ producer Kafka producer.
@ topic Topic name.
=> List of partitions.
Cf. interop.clj/partition-info
Throws
org.apache.kafka.common.errors
WakeupException
When `unblock` is called before or while this fn is called.
InterruptException
When the calling thread is interrupted.
Gets a list of partitions for a given topic.
<!> Blocks forever is the topic does not exist and dynamic creation has been disabled.
@ producer
Kafka producer.
@ topic
Topic name.
=> List of partitions.
Cf. `interop.clj/partition-info`
Throws
org.apache.kafka.common.errors
WakeupException
When `unblock` is called before or while this fn is called.
InterruptException
When the calling thread is interrupted.(trx-abort producer)Aborts the ongoing transaction.
@ producer Kafka producer.
=> producer
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
ProducerFencedException
When another producer with the same transaction id is already active.
UnsupportedVersionException
When the broker does not support transactions.
AuthorizationException
When the 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.
Aborts the ongoing transaction.
@ producer
Kafka producer.
=> `producer`
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
ProducerFencedException
When another producer with the same transaction id is already active.
UnsupportedVersionException
When the broker does not support transactions.
AuthorizationException
When the 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.(trx-begin producer)Must be called exactly once at the start of each new transaction.
<!> trx-init must be called before any transaction.
@ producer Kafka producer.
=> producer
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
ProducerFencedException
When another producer with the same transaction id is already active.
UnsupportedVersionException
When the broker does not support transactions.
AuthorizationException
When 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.
Must be called exactly once at the start of each new transaction.
<!> `trx-init` must be called before any transaction.
@ producer
Kafka producer.
=> `producer`
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
ProducerFencedException
When another producer with the same transaction id is already active.
UnsupportedVersionException
When the broker does not support transactions.
AuthorizationException
When 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.(trx-commit producer)Commits the ongoing transaction.
A transaction succeeds only if every step succeeds.
If any record commit hit an irrecoverable error, this function will rethrow that exception and the transaction will not be committed.
@ producer Kafka producer.
=> producer
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
ProducerFencedException
When another producer with the same transaction id is already active.
UnsupportedVersionException
When the broker does not support transactions.
AuthorizationException
When the 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.
Commits the ongoing transaction.
A transaction succeeds only if every step succeeds.
If any record commit hit an irrecoverable error, this function will rethrow that exception and the transaction
will not be committed.
@ producer
Kafka producer.
=> `producer`
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
ProducerFencedException
When another producer with the same transaction id is already active.
UnsupportedVersionException
When the broker does not support transactions.
AuthorizationException
When the 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.(trx-init producer)When 'transactional.id' is set, needs to be called exactly once before doing any transaction.
Based on this id, ensures any previous transaction committed by a previous instance is completed or any pending transaction is aborted. Further more, it prepares the producer for future ones.
@ producer Kafka producer.
=> producer
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
UnsupportedVersionException
When broker does not support transactions.
AuthorizationException
When the 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.
When 'transactional.id' is set, needs to be called exactly once before doing any transaction.
Based on this id, ensures any previous transaction committed by a previous instance is completed or any pending
transaction is aborted. Further more, it prepares the producer for future ones.
@ producer
Kafka producer.
=> `producer`
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
UnsupportedVersionException
When broker does not support transactions.
AuthorizationException
When the 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.(trx-offsets producer group-id offsets)Commits offsets for a consumer group id as part of the ongoing transaction.
Very useful for a consume-transform-produce use case.
The consumer polls records without committing its offsets ('enable.auto.commit' must be set to false). After some computation, while in a transaction, the producer publishes results as well as the offsets (ie. for each topic partition, the offset of the last processed record + 1). This garantees exacly once semantics.
@ producer Kafka producer.
=> producer
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
ProducerFencedException
When another producer with the same transaction id is already active.
UnsupportedVersionException
When the broker does not support transactions.
AuthorizationException
When the 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.
Commits offsets for a consumer group id as part of the ongoing transaction.
Very useful for a consume-transform-produce use case.
The consumer polls records without committing its offsets ('enable.auto.commit' must be set to false). After some computation,
while in a transaction, the producer publishes results as well as the offsets (ie. for each topic partition, the offset of the
last processed record + 1). This garantees exacly once semantics.
@ producer
Kafka producer.
=> `producer`
Throws
java.lang
IllegalStateException
When no 'transactional.id' has been configured.
org.apache.kafka.common.errors
ProducerFencedException
When another producer with the same transaction id is already active.
UnsupportedVersionException
When the broker does not support transactions.
AuthorizationException
When the 'transactional.id' is not authorized.
KafkaException
Any other unrecoverable error.cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |