Liking cljdoc? Tell your friends :D

dvlopt.kafka.admin

Kafka administration.

Functions accepting a timeout relies on the default request timeout of the admin client when none is provided. Time intervals are described in dvlopt.kafka.

Kafka administration.


Functions accepting a timeout relies on the default request timeout of the admin client when none
is provided. Time intervals are described in `dvlopt.kafka`.
raw docstring

aclsclj

(acls admin)
(acls admin acl-filter)
(acls admin acl-filter options)

Requests needed ACLs.

Returns a future resolving to a list of ACLs or throwing in case of error.

The ACL filter is a nilable map which may contain :

::operation Type of ACL, one of #{:all (default) :alter :alter-configuration :any :cluster-action :create :delete :describe :describe-configuration :idempotent-write :read :write}.

::name-pattern [Filter-Type String] where Filter-Type is one of :

:any (default)
 String unneeded as it maches anything.

:exactly
 Will match the given string and nothing else.

:match
 Like :exactly and :prefixed but also expands wildcards (ie. "*" like in globs).

:prefixed
 Will match anything beginning with the given string.

A nil value will match anything.

::permission One of #{:any (default) :allow :deny}.

::principal Who.

::resource-type One of #{:any (default) :cluster :consumer-group :topic :transactional-id}.

:dvlopt.kafka/host Restricts to the given host or none if this option is not provided.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Ex. (acls admin {::permission :allow ::operation :create ::resource-type :topic ::name-pattern [:match "my_*_topic"]} {:dvlopt.kafka/timeout [5 :seconds]})

Requests needed ACLs.

Returns a future resolving to a list of ACLs or throwing in case of error.


The ACL filter is a nilable map which may contain :

  ::operation
   Type of ACL, one of #{:all (default)
                         :alter
                         :alter-configuration
                         :any
                         :cluster-action
                         :create
                         :delete
                         :describe
                         :describe-configuration
                         :idempotent-write
                         :read
                         :write}.

 ::name-pattern
  [Filter-Type String] where Filter-Type is one of :

    :any (default)
     String unneeded as it maches anything.

    :exactly
     Will match the given string and nothing else.

    :match
     Like :exactly and :prefixed but also expands wildcards (ie. "*" like in globs).

    :prefixed
     Will match anything beginning with the given string.

  A nil value will match anything.

 ::permission
  One of #{:any (default)
           :allow
           :deny}.

 ::principal
  Who.


 ::resource-type
  One of #{:any (default)
           :cluster
           :consumer-group
           :topic
           :transactional-id}.

:dvlopt.kafka/host
 Restricts to the given host or none if this option is not provided.


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description


Ex. (acls admin
          {::permission    :allow
           ::operation     :create
           ::resource-type :topic
           ::name-pattern  [:match "my_*_topic"]}
          {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

adminclj

(admin)
(admin options)

Creates an admin client.

A map of options may be given :

::configuration Map of Kafka admin properties. Cf. https://kafka.apache.org/documentation/#adminclientconfigs

:dvlopt.kafka/nodes List of [host port].

Ex. (client {::configuration {"client.id" "some_id"} :dvlopt.kafka/nodes [["some_host" 9092]]})

Creates an admin client.

A map of options may be given :

  ::configuration
    Map of Kafka admin properties.
    Cf. https://kafka.apache.org/documentation/#adminclientconfigs

  :dvlopt.kafka/nodes
    List of [host port].


Ex. (client {::configuration     {"client.id" "some_id"}
             :dvlopt.kafka/nodes [["some_host" 9092]]})
sourceraw docstring

closeclj

(close admin)
(close admin options)

Closes the admin client and releases all associated resources.

A map of options may be given :

:dvlopt.kafka/timeout Grace period during which current operations will be allowed to complete and no new ones will be accepted. Futures related to unfinished operations will throw when dereferenced. Cf. Namespace description

Closes the admin client and releases all associated resources.

A map of options may be given :

  :dvlopt.kafka/timeout
   Grace period during which current operations will be allowed to complete and no new ones will be accepted.
   Futures related to unfinished operations will throw when dereferenced.
   Cf. Namespace description
sourceraw docstring

clusterclj

(cluster admin)
(cluster admin options)

Requests a map containing information about the nodes in the cluster :

:dvlopt.kafka/f*id Future, the id of the cluster.

:dvlopt.kafka/f*nodes Future, a list of the cluster's nodes.

:dvlopt.kafka/f*controller-node Future, the controller node.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Requests a map containing information about the nodes in the cluster :

  :dvlopt.kafka/f*id
   Future, the id of the cluster.

  :dvlopt.kafka/f*nodes
   Future, a list of the cluster's nodes.

  :dvlopt.kafka/f*controller-node
   Future, the controller node.


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description
sourceraw docstring

configurationclj

(configuration admin resources)
(configuration admin resources options)

Requests the current configuration of the given resources (brokers and/or topics).

Each property points to a map containing :

:dvlopt.kafka/default? Is this a default value ?

:dvlopt.kafka/name Name of the property.

:dvlopt.kafka/read-only? Is this a read-only property ?

:dvlopt.kafka/string-value Value presented as a string.

::sensitive? Is this a sensitive value that should be kept secret ?

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Ex. (configuration admin {:dvlopt.kafka/brokers #{"0" "1"} :dvlopt.kafka/topics #{"topic-one" "topic-two"}} {:dvlopt.kafka/timeout [5 :seconds]})

Requests the current configuration of the given resources (brokers and/or topics).

Each property points to a map containing :

  :dvlopt.kafka/default?
   Is this a default value ?

  :dvlopt.kafka/name
   Name of the property.

  :dvlopt.kafka/read-only? 
   Is this a read-only property ?

  :dvlopt.kafka/string-value
   Value presented as a string.

  ::sensitive?
   Is this a sensitive value that should be kept secret ?


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description


Ex. (configuration admin
                   {:dvlopt.kafka/brokers #{"0"
                                            "1"}
                    :dvlopt.kafka/topics  #{"topic-one"
                                            "topic-two"}}
                   {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

configureclj

(configure admin configurations)
(configure admin configurations options)

Requests modifications in the configuration of the given resources (brokers and/or topics).

Cf. https://kafka.apache.org/documentation/#brokerconfigs https://kafka.apache.org/documentation/#topicconfigs

Returns a map containing the involved brokers and topics, pointing to futures throwing an exception in case of failure.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Ex. (configure admin {::configuration.brokers {"0" {"delete.topic.enable" true}} ::configuration.topics {"my-topic" {"cleanup.policy" "compact"}}} {:dvlopt.kafka/timeout [5 :seconds]})

Requests modifications in the configuration of the given resources (brokers and/or topics).


Cf. https://kafka.apache.org/documentation/#brokerconfigs
    https://kafka.apache.org/documentation/#topicconfigs


Returns a map containing the involved brokers and topics, pointing to futures throwing an exception in case of failure.


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description


Ex. (configure admin
               {::configuration.brokers {"0" {"delete.topic.enable" true}}
                ::configuration.topics  {"my-topic" {"cleanup.policy" "compact"}}}
               {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

consumer-group-offsetsclj

(consumer-group-offsets admin consumer-group)
(consumer-group-offsets admin consumer-group options)

For the given consumer group, requests a map of [topic partition] -> map containing the current :dvlopt.kafka/offset.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

:dvlopt.kafka/topic-partitions Restricts result to the requested [topic partition]'s.

For the given consumer group, requests a map of [topic partition] -> map containing the current :dvlopt.kafka/offset.


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description

  :dvlopt.kafka/topic-partitions
   Restricts result to the requested [topic partition]'s.
sourceraw docstring

consumer-groupsclj

(consumer-groups admin)
(consumer-groups admin options)

Requests a map of current consumer group -> metadata.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Requests a map of current consumer group -> metadata.

A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description
sourceraw docstring

create-aclsclj

(create-acls admin acls)
(create-acls admin acls options)

Requests the creation of new ACLs (a list of maps akin to filters described in acls).

Returns a map of ACL -> future throwing in case of error.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Ex. (create-acls admin [{::principal "some.user" ::permission :allow ::operation :create ::resource-type :topic ::name-pattern [:prefixed "some_topics"]}] {:dvlopt.kafka/timeout [5 :seconds]})

Requests the creation of new ACLs (a list of maps akin to filters described in `acls`).

Returns a map of ACL -> future throwing in case of error.


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description


Ex. (create-acls admin
                 [{::principal     "some.user"
                   ::permission    :allow
                   ::operation     :create
                   ::resource-type :topic
                   ::name-pattern  [:prefixed "some_topics"]}]
                 {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

create-topicsclj

(create-topics admin new-topics)
(create-topics admin new-topics options)

Requests the creation of new topics.

This operation is supported by brokers with version 0.10.1.0 or higher.

Returns a map of topic name -> future throwing if an error occured.

New topics are specified by a map of topic name -> argument map.

An argument map consists of either the number of partitions with the replication factor or a manual assignment of partition numbers to replica ids. Although not inforced, it is generally a good idea for all partitions to have the same number of replicas.

Also, each topic can be configured as needed (cf. https://kafka.apache.org/documentation/#topicconfigs).

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Ex. (create-topics admin {"first-new-topic" {::number-of-partitions 4 ::replication-factor 1 ::configuration.topic {"cleanup.policy" "compact"}} "second-new-topic" {::replica-assignments {0 [0 1] 1 [2 3]}}} {:dvlopt.kafka/timeout [5 :seconds]})

Requests the creation of new topics.

This operation is supported by brokers with version 0.10.1.0 or higher.

Returns a map of topic name -> future throwing if an error occured.

New topics are specified by a map of topic name -> argument map.

An argument map consists of either the number of partitions with the replication factor or
a manual assignment of partition numbers  to replica ids. Although not inforced, it is generally a
good idea for all partitions to have the same number of replicas.

Also, each topic can be configured as needed (cf. https://kafka.apache.org/documentation/#topicconfigs).


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description


Ex. (create-topics admin
                   {"first-new-topic"  {::number-of-partitions 4
                                          ::replication-factor   1
                                          ::configuration.topic  {"cleanup.policy" "compact"}}
                    "second-new-topic" {::replica-assignments  {0 [0 1]
                                                                  1 [2 3]}}}
                   {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

delete-aclsclj

(delete-acls admin acl-filters)
(delete-acls admin acl-filters options)

Requests the deletion of ACLs following a list of filters (cf. acls).

Returns a map of ACL filter -> future resolving to a list of :

[:acl ACL] in case of success, where ACL matches the filter

[:error Exception] in case of failure

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Ex. (delete-acls admin [{::principal "some.user" ::permission :allow ::operation :create ::resource-type :topic ::pattern [:prefixed "some_topics"]}] {:dvlopt.kafka/timeout [5 :seconds]})

Requests the deletion of ACLs following a list of filters (cf. `acls`).

Returns a map of ACL filter -> future resolving to a list of :

  [:acl ACL]         in case of success, where ACL matches the filter

  [:error Exception] in case of failure


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description


Ex. (delete-acls admin
                 [{::principal     "some.user"
                   ::permission    :allow
                   ::operation     :create
                   ::resource-type :topic
                   ::pattern       [:prefixed "some_topics"]}]
                 {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

delete-records-beforeclj

(delete-records-before admin topic-partition->offset)
(delete-records-before admin topic-partition->offset options)

Requests the deletion of all records prior to the offset given for each [topic partition].

Returns a map of [topic partition] -> future resolving to {:dvlopt.kafka/lowest-offset ...} or throwing in case of error.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Ex. (delete-records-before admin {["some_topic" 0] 62} {:dvlopt.kafka/timeout [5 :seconds]})

Requests the deletion of all records prior to the offset given for each [topic partition].

Returns a map of [topic partition] -> future resolving to {:dvlopt.kafka/lowest-offset ...} or
throwing in case of error.


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description


Ex. (delete-records-before admin
                           {["some_topic" 0] 62}
                           {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

delete-topicsclj

(delete-topics admin topics)
(delete-topics admin topics options)

Requests the deletion of topics.

Returns a map of topic name -> future throwing if an error occured.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Ex. (delete-topics admin ["some-topic" "another-topic"] {:dvlopt.kafka/timeout [5 :seconds]})

Requests the deletion of topics.

Returns a map of topic name -> future throwing if an error occured.


A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description


Ex. (delete-topics admin
                   ["some-topic"
                    "another-topic"]
                   {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

describe-consumer-groupsclj

(describe-consumer-groups admin group-ids)
(describe-consumer-groups admin group-ids options)

Given a list of consumer groups to describe, requests a map of consumer group -> detailed metadata.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Given a list of consumer groups to describe, requests a map of consumer group -> detailed metadata.

A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description
sourceraw docstring

describe-topicsclj

(describe-topics admin topics)
(describe-topics admin topics options)

Given a list of topics to describe, requests a map of topic name -> future resolving to map containing :

:dvlopt.kafka/internal? Is this topic internal ?

::partition-descriptions List of partition descriptions sorted by partition number, ie. maps containing :

:dvlopt.kafka/leader-node
 Cf. `dvlopt.kafka` for descriptions of Kafka nodes.

:dvlopt.kafka/partition
 Partition number.

:dvlopt.kafka/replica-nodes
 Cf. `dvlopt.kafka` for descriptions of Kafka nodes.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Given a list of topics to describe, requests a map of topic name -> future resolving to map containing :

 :dvlopt.kafka/internal?
  Is this topic internal ?

 ::partition-descriptions
  List of partition descriptions sorted by partition number, ie. maps containing :

    :dvlopt.kafka/leader-node
     Cf. `dvlopt.kafka` for descriptions of Kafka nodes.

    :dvlopt.kafka/partition
     Partition number.

    :dvlopt.kafka/replica-nodes
     Cf. `dvlopt.kafka` for descriptions of Kafka nodes.


A map of options may be given :

 :dvlopt.kafka/timeout
  Cf. Namespace description
sourceraw docstring

increase-partitionsclj

(increase-partitions admin topics->new-partitions)
(increase-partitions admin topics->new-partitions options)

Requests an increase of the number of partitions for the given topics.

<!> Does not repartition existing topics and the partitioning of new records will be different <!>

For each topic, the new number of partitions has to be specified. Can also be supplied the assignements of the new partitions in form of a list of list of replica ids where the first one is the prefered leader.

Returns a map of topic name -> future throwing if an error occured.

A map of options may be given :

:dvlopt.kafka/timeout Cf. Namespace description

Ex. Given 2 partitions with a replication factor of 3, increases the number to 4 and specifies assignments :

(partitions-create admin
                   {"some_topic" {::number-of-partitions 4
                                    ::replica-assignments  [[0 1 2]      ;; replicas for partition 3, replica 0 being the prefered leader
                                                            [1 2 3]]}}   ;; replicas for partition 4, replica 1 being the prefered leader
                   {:dvlopt.kafka/timeout [5 :seconds]})
Requests an increase of the number of partitions for the given topics.

<!> Does not repartition existing topics and the partitioning of new records will be different <!>

For each topic, the new number of partitions has to be specified. Can also be supplied the assignements
of the new partitions in form of a list of list of replica ids where the first one is the prefered leader.

Returns a map of topic name -> future throwing if an error occured.

A map of options may be given :

  :dvlopt.kafka/timeout
   Cf. Namespace description


Ex. Given 2 partitions with a replication factor of 3, increases the number to 4 and specifies assignments :

    (partitions-create admin
                       {"some_topic" {::number-of-partitions 4
                                        ::replica-assignments  [[0 1 2]      ;; replicas for partition 3, replica 0 being the prefered leader
                                                                [1 2 3]]}}   ;; replicas for partition 4, replica 1 being the prefered leader
                       {:dvlopt.kafka/timeout [5 :seconds]})
sourceraw docstring

topicsclj

(topics admin)
(topics admin options)

Requests a future resolving to a map of topic name -> map containing :

:dvlopt.kafka/internal? Is this topic internal ?

A map of options may be given :

:dvlopt.kafka/internal? Should internal topics be retrieved ?

:dvlopt.kafka/timeout Cf. Namespace description

Requests a future resolving to a map of topic name -> map containing :

 :dvlopt.kafka/internal?
  Is this topic internal ?


A map of options may be given :

  :dvlopt.kafka/internal?
    Should internal topics be retrieved ?

  :dvlopt.kafka/timeout
   Cf. Namespace description
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close