Kafka administration.
Functions accepting a timeout relies on the default request timeout of the admin client when none
is provided. Time intervals are described in dvlopt.kafka
.
Kafka administration. Functions accepting a timeout relies on the default request timeout of the admin client when none is provided. Time intervals are described in `dvlopt.kafka`.
(acls admin)
(acls admin acl-filter)
(acls admin acl-filter options)
Requests needed ACLs.
Returns a future resolving to a list of ACLs or throwing in case of error.
The ACL filter is a nilable map which may contain :
::operation Type of ACL, one of #{:all (default) :alter :alter-configuration :any :cluster-action :create :delete :describe :describe-configuration :idempotent-write :read :write}.
::name-pattern [Filter-Type String] where Filter-Type is one of :
:any (default)
String unneeded as it maches anything.
:exactly
Will match the given string and nothing else.
:match
Like :exactly and :prefixed but also expands wildcards (ie. "*" like in globs).
:prefixed
Will match anything beginning with the given string.
A nil value will match anything.
::permission One of #{:any (default) :allow :deny}.
::principal Who.
::resource-type One of #{:any (default) :cluster :consumer-group :topic :transactional-id}.
:dvlopt.kafka/host Restricts to the given host or none if this option is not provided.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Ex. (acls admin {::permission :allow ::operation :create ::resource-type :topic ::name-pattern [:match "my_*_topic"]} {:dvlopt.kafka/timeout [5 :seconds]})
Requests needed ACLs. Returns a future resolving to a list of ACLs or throwing in case of error. The ACL filter is a nilable map which may contain : ::operation Type of ACL, one of #{:all (default) :alter :alter-configuration :any :cluster-action :create :delete :describe :describe-configuration :idempotent-write :read :write}. ::name-pattern [Filter-Type String] where Filter-Type is one of : :any (default) String unneeded as it maches anything. :exactly Will match the given string and nothing else. :match Like :exactly and :prefixed but also expands wildcards (ie. "*" like in globs). :prefixed Will match anything beginning with the given string. A nil value will match anything. ::permission One of #{:any (default) :allow :deny}. ::principal Who. ::resource-type One of #{:any (default) :cluster :consumer-group :topic :transactional-id}. :dvlopt.kafka/host Restricts to the given host or none if this option is not provided. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description Ex. (acls admin {::permission :allow ::operation :create ::resource-type :topic ::name-pattern [:match "my_*_topic"]} {:dvlopt.kafka/timeout [5 :seconds]})
(admin)
(admin options)
Creates an admin client.
A map of options may be given :
::configuration Map of Kafka admin properties. Cf. https://kafka.apache.org/documentation/#adminclientconfigs
:dvlopt.kafka/nodes List of [host port].
Ex. (client {::configuration {"client.id" "some_id"} :dvlopt.kafka/nodes [["some_host" 9092]]})
Creates an admin client. A map of options may be given : ::configuration Map of Kafka admin properties. Cf. https://kafka.apache.org/documentation/#adminclientconfigs :dvlopt.kafka/nodes List of [host port]. Ex. (client {::configuration {"client.id" "some_id"} :dvlopt.kafka/nodes [["some_host" 9092]]})
(close admin)
(close admin options)
Closes the admin client and releases all associated resources.
A map of options may be given :
:dvlopt.kafka/timeout Grace period during which current operations will be allowed to complete and no new ones will be accepted. Futures related to unfinished operations will throw when dereferenced. Cf. Namespace description
Closes the admin client and releases all associated resources. A map of options may be given : :dvlopt.kafka/timeout Grace period during which current operations will be allowed to complete and no new ones will be accepted. Futures related to unfinished operations will throw when dereferenced. Cf. Namespace description
(cluster admin)
(cluster admin options)
Requests a map containing information about the nodes in the cluster :
:dvlopt.kafka/f*id Future, the id of the cluster.
:dvlopt.kafka/f*nodes Future, a list of the cluster's nodes.
:dvlopt.kafka/f*controller-node Future, the controller node.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Requests a map containing information about the nodes in the cluster : :dvlopt.kafka/f*id Future, the id of the cluster. :dvlopt.kafka/f*nodes Future, a list of the cluster's nodes. :dvlopt.kafka/f*controller-node Future, the controller node. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description
(configuration admin resources)
(configuration admin resources options)
Requests the current configuration of the given resources (brokers and/or topics).
Each property points to a map containing :
:dvlopt.kafka/default? Is this a default value ?
:dvlopt.kafka/name Name of the property.
:dvlopt.kafka/read-only? Is this a read-only property ?
:dvlopt.kafka/string-value Value presented as a string.
::sensitive? Is this a sensitive value that should be kept secret ?
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Ex. (configuration admin {:dvlopt.kafka/brokers #{"0" "1"} :dvlopt.kafka/topics #{"topic-one" "topic-two"}} {:dvlopt.kafka/timeout [5 :seconds]})
Requests the current configuration of the given resources (brokers and/or topics). Each property points to a map containing : :dvlopt.kafka/default? Is this a default value ? :dvlopt.kafka/name Name of the property. :dvlopt.kafka/read-only? Is this a read-only property ? :dvlopt.kafka/string-value Value presented as a string. ::sensitive? Is this a sensitive value that should be kept secret ? A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description Ex. (configuration admin {:dvlopt.kafka/brokers #{"0" "1"} :dvlopt.kafka/topics #{"topic-one" "topic-two"}} {:dvlopt.kafka/timeout [5 :seconds]})
(configure admin configurations)
(configure admin configurations options)
Requests modifications in the configuration of the given resources (brokers and/or topics).
Cf. https://kafka.apache.org/documentation/#brokerconfigs https://kafka.apache.org/documentation/#topicconfigs
Returns a map containing the involved brokers and topics, pointing to futures throwing an exception in case of failure.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Ex. (configure admin {::configuration.brokers {"0" {"delete.topic.enable" true}} ::configuration.topics {"my-topic" {"cleanup.policy" "compact"}}} {:dvlopt.kafka/timeout [5 :seconds]})
Requests modifications in the configuration of the given resources (brokers and/or topics). Cf. https://kafka.apache.org/documentation/#brokerconfigs https://kafka.apache.org/documentation/#topicconfigs Returns a map containing the involved brokers and topics, pointing to futures throwing an exception in case of failure. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description Ex. (configure admin {::configuration.brokers {"0" {"delete.topic.enable" true}} ::configuration.topics {"my-topic" {"cleanup.policy" "compact"}}} {:dvlopt.kafka/timeout [5 :seconds]})
(consumer-group-offsets admin consumer-group)
(consumer-group-offsets admin consumer-group options)
For the given consumer group, requests a map of [topic partition] -> map containing the current :dvlopt.kafka/offset.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
:dvlopt.kafka/topic-partitions Restricts result to the requested [topic partition]'s.
For the given consumer group, requests a map of [topic partition] -> map containing the current :dvlopt.kafka/offset. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description :dvlopt.kafka/topic-partitions Restricts result to the requested [topic partition]'s.
(consumer-groups admin)
(consumer-groups admin options)
Requests a map of current consumer group -> metadata.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Requests a map of current consumer group -> metadata. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description
(create-acls admin acls)
(create-acls admin acls options)
Requests the creation of new ACLs (a list of maps akin to filters described in acls
).
Returns a map of ACL -> future throwing in case of error.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Ex. (create-acls admin [{::principal "some.user" ::permission :allow ::operation :create ::resource-type :topic ::name-pattern [:prefixed "some_topics"]}] {:dvlopt.kafka/timeout [5 :seconds]})
Requests the creation of new ACLs (a list of maps akin to filters described in `acls`). Returns a map of ACL -> future throwing in case of error. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description Ex. (create-acls admin [{::principal "some.user" ::permission :allow ::operation :create ::resource-type :topic ::name-pattern [:prefixed "some_topics"]}] {:dvlopt.kafka/timeout [5 :seconds]})
(create-topics admin new-topics)
(create-topics admin new-topics options)
Requests the creation of new topics.
This operation is supported by brokers with version 0.10.1.0 or higher.
Returns a map of topic name -> future throwing if an error occured.
New topics are specified by a map of topic name -> argument map.
An argument map consists of either the number of partitions with the replication factor or a manual assignment of partition numbers to replica ids. Although not inforced, it is generally a good idea for all partitions to have the same number of replicas.
Also, each topic can be configured as needed (cf. https://kafka.apache.org/documentation/#topicconfigs).
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Ex. (create-topics admin {"first-new-topic" {::number-of-partitions 4 ::replication-factor 1 ::configuration.topic {"cleanup.policy" "compact"}} "second-new-topic" {::replica-assignments {0 [0 1] 1 [2 3]}}} {:dvlopt.kafka/timeout [5 :seconds]})
Requests the creation of new topics. This operation is supported by brokers with version 0.10.1.0 or higher. Returns a map of topic name -> future throwing if an error occured. New topics are specified by a map of topic name -> argument map. An argument map consists of either the number of partitions with the replication factor or a manual assignment of partition numbers to replica ids. Although not inforced, it is generally a good idea for all partitions to have the same number of replicas. Also, each topic can be configured as needed (cf. https://kafka.apache.org/documentation/#topicconfigs). A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description Ex. (create-topics admin {"first-new-topic" {::number-of-partitions 4 ::replication-factor 1 ::configuration.topic {"cleanup.policy" "compact"}} "second-new-topic" {::replica-assignments {0 [0 1] 1 [2 3]}}} {:dvlopt.kafka/timeout [5 :seconds]})
(delete-acls admin acl-filters)
(delete-acls admin acl-filters options)
Requests the deletion of ACLs following a list of filters (cf. acls
).
Returns a map of ACL filter -> future resolving to a list of :
[:acl ACL] in case of success, where ACL matches the filter
[:error Exception] in case of failure
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Ex. (delete-acls admin [{::principal "some.user" ::permission :allow ::operation :create ::resource-type :topic ::pattern [:prefixed "some_topics"]}] {:dvlopt.kafka/timeout [5 :seconds]})
Requests the deletion of ACLs following a list of filters (cf. `acls`). Returns a map of ACL filter -> future resolving to a list of : [:acl ACL] in case of success, where ACL matches the filter [:error Exception] in case of failure A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description Ex. (delete-acls admin [{::principal "some.user" ::permission :allow ::operation :create ::resource-type :topic ::pattern [:prefixed "some_topics"]}] {:dvlopt.kafka/timeout [5 :seconds]})
(delete-records-before admin topic-partition->offset)
(delete-records-before admin topic-partition->offset options)
Requests the deletion of all records prior to the offset given for each [topic partition].
Returns a map of [topic partition] -> future resolving to {:dvlopt.kafka/lowest-offset ...} or throwing in case of error.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Ex. (delete-records-before admin {["some_topic" 0] 62} {:dvlopt.kafka/timeout [5 :seconds]})
Requests the deletion of all records prior to the offset given for each [topic partition]. Returns a map of [topic partition] -> future resolving to {:dvlopt.kafka/lowest-offset ...} or throwing in case of error. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description Ex. (delete-records-before admin {["some_topic" 0] 62} {:dvlopt.kafka/timeout [5 :seconds]})
(delete-topics admin topics)
(delete-topics admin topics options)
Requests the deletion of topics.
Returns a map of topic name -> future throwing if an error occured.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Ex. (delete-topics admin ["some-topic" "another-topic"] {:dvlopt.kafka/timeout [5 :seconds]})
Requests the deletion of topics. Returns a map of topic name -> future throwing if an error occured. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description Ex. (delete-topics admin ["some-topic" "another-topic"] {:dvlopt.kafka/timeout [5 :seconds]})
(describe-consumer-groups admin group-ids)
(describe-consumer-groups admin group-ids options)
Given a list of consumer groups to describe, requests a map of consumer group -> detailed metadata.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Given a list of consumer groups to describe, requests a map of consumer group -> detailed metadata. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description
(describe-topics admin topics)
(describe-topics admin topics options)
Given a list of topics to describe, requests a map of topic name -> future resolving to map containing :
:dvlopt.kafka/internal? Is this topic internal ?
::partition-descriptions List of partition descriptions sorted by partition number, ie. maps containing :
:dvlopt.kafka/leader-node
Cf. `dvlopt.kafka` for descriptions of Kafka nodes.
:dvlopt.kafka/partition
Partition number.
:dvlopt.kafka/replica-nodes
Cf. `dvlopt.kafka` for descriptions of Kafka nodes.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Given a list of topics to describe, requests a map of topic name -> future resolving to map containing : :dvlopt.kafka/internal? Is this topic internal ? ::partition-descriptions List of partition descriptions sorted by partition number, ie. maps containing : :dvlopt.kafka/leader-node Cf. `dvlopt.kafka` for descriptions of Kafka nodes. :dvlopt.kafka/partition Partition number. :dvlopt.kafka/replica-nodes Cf. `dvlopt.kafka` for descriptions of Kafka nodes. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description
(increase-partitions admin topics->new-partitions)
(increase-partitions admin topics->new-partitions options)
Requests an increase of the number of partitions for the given topics.
<!> Does not repartition existing topics and the partitioning of new records will be different <!>
For each topic, the new number of partitions has to be specified. Can also be supplied the assignements of the new partitions in form of a list of list of replica ids where the first one is the prefered leader.
Returns a map of topic name -> future throwing if an error occured.
A map of options may be given :
:dvlopt.kafka/timeout Cf. Namespace description
Ex. Given 2 partitions with a replication factor of 3, increases the number to 4 and specifies assignments :
(partitions-create admin
{"some_topic" {::number-of-partitions 4
::replica-assignments [[0 1 2] ;; replicas for partition 3, replica 0 being the prefered leader
[1 2 3]]}} ;; replicas for partition 4, replica 1 being the prefered leader
{:dvlopt.kafka/timeout [5 :seconds]})
Requests an increase of the number of partitions for the given topics. <!> Does not repartition existing topics and the partitioning of new records will be different <!> For each topic, the new number of partitions has to be specified. Can also be supplied the assignements of the new partitions in form of a list of list of replica ids where the first one is the prefered leader. Returns a map of topic name -> future throwing if an error occured. A map of options may be given : :dvlopt.kafka/timeout Cf. Namespace description Ex. Given 2 partitions with a replication factor of 3, increases the number to 4 and specifies assignments : (partitions-create admin {"some_topic" {::number-of-partitions 4 ::replica-assignments [[0 1 2] ;; replicas for partition 3, replica 0 being the prefered leader [1 2 3]]}} ;; replicas for partition 4, replica 1 being the prefered leader {:dvlopt.kafka/timeout [5 :seconds]})
(topics admin)
(topics admin options)
Requests a future resolving to a map of topic name -> map containing :
:dvlopt.kafka/internal? Is this topic internal ?
A map of options may be given :
:dvlopt.kafka/internal? Should internal topics be retrieved ?
:dvlopt.kafka/timeout Cf. Namespace description
Requests a future resolving to a map of topic name -> map containing : :dvlopt.kafka/internal? Is this topic internal ? A map of options may be given : :dvlopt.kafka/internal? Should internal topics be retrieved ? :dvlopt.kafka/timeout Cf. Namespace description
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close