Liking cljdoc? Tell your friends :D

datasplash.kafka


read-kafkaclj

(read-kafka bootstrap-servers topic key-deserializer value-deserializer p)
(read-kafka bootstrap-servers
            topic
            key-deserializer
            value-deserializer
            options
            p)

Reads from a Kafka topic. Returns a KafkaRecord (https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/io/kafka/KafkaRecord.html) mapped to a clojure map

    {:payload   "Deserialized with `value-deserializer`"
     :key       "Deserialized with `key-deserializer`"
     :offset    "..."     
     :partition "..."
     :timestamp "..."   
     :topic     "..."   
     :headers   "A map of `{key values}` of `Header` (https://www.javadoc.io/static/org.apache.kafka/kafka-clients/1.0.0/org/apache/kafka/common/header/Header.html)"}

Examples:

(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" key-deserializer  value-deserializer options pcoll)

Using StringDeserializer from org.apache.kafka.common.serialization (https://kafka.apache.org/0102/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html):

(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" StringDeserializer StringDeserializer {:name :read-from-kafka} pcoll)

Available options:

  • :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug.
  • :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder.
  • :commit-offsets-in-finalize => Finalized offsets are committed to Kafka.
  • :name => Adds a name to the Transform.
  • :with-consumer-config-updates => Update configuration for the backend main consumer.
  • :with-create-time => Sets the timestamps policy based on KafkaTimestampType.CREATE_TIME timestamp of the records.
  • :with-log-append-time => Sets TimestampPolicy to TimestampPolicyFactory.LogAppendTimePolicy.
  • :with-max-num-records => Similar to Read.Unbounded.withMaxNumRecords(long).
  • :with-max-read-time => Similar to Read.Unbounded.withMaxReadTime(Duration).
  • :with-offset-consumer-config-overrides =>
  • :with-processing-time => Sets TimestampPolicy to TimestampPolicyFactory.ProcessingTimePolicy.
  • :with-read-committed => Sets " isolation_level " to " read_committed " in Kafka consumer configuration.
  • :with-start-read-time => Provide custom `TimestampPolicyFactory to set event times and watermark for each partition.
  • :with-topic-partitions => Sets a list of partitions to read from. The list of partitions should be a collection of ['str-topic-name', int-partition-number]
  • :without-metadata => Returns a PTransform for PCollection of KV, dropping Kafka metatdata.
Reads from a Kafka topic. Returns a `KafkaRecord` (https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/io/kafka/KafkaRecord.html) mapped to a clojure map

```
    {:payload   "Deserialized with `value-deserializer`"
     :key       "Deserialized with `key-deserializer`"
     :offset    "..."     
     :partition "..."
     :timestamp "..."   
     :topic     "..."   
     :headers   "A map of `{key values}` of `Header` (https://www.javadoc.io/static/org.apache.kafka/kafka-clients/1.0.0/org/apache/kafka/common/header/Header.html)"}
```

Examples:
```
(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" key-deserializer  value-deserializer options pcoll)
```

Using `StringDeserializer` from `org.apache.kafka.common.serialization` (https://kafka.apache.org/0102/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html):
```
(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" StringDeserializer StringDeserializer {:name :read-from-kafka} pcoll)
```

Available options:

  - :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug.
  - :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder.
  - :commit-offsets-in-finalize => Finalized offsets are committed to Kafka.
  - :name => Adds a name to the Transform.
  - :with-consumer-config-updates => Update configuration for the backend main consumer.
  - :with-create-time => Sets the timestamps policy based on `KafkaTimestampType.CREATE_TIME` timestamp of the records.
  - :with-log-append-time => Sets TimestampPolicy to `TimestampPolicyFactory.LogAppendTimePolicy`.
  - :with-max-num-records => Similar to `Read.Unbounded.withMaxNumRecords(long)`.
  - :with-max-read-time => Similar to `Read.Unbounded.withMaxReadTime(Duration)`.
  - :with-offset-consumer-config-overrides => 
  - :with-processing-time => Sets TimestampPolicy to `TimestampPolicyFactory.ProcessingTimePolicy`.
  - :with-read-committed => Sets " isolation_level " to " read_committed " in Kafka consumer configuration.
  - :with-start-read-time => Provide custom `TimestampPolicyFactory  to set event times and watermark for each partition.
  - :with-topic-partitions => Sets a list of partitions to read from. The list of partitions should be a collection of ['str-topic-name', int-partition-number]
  - :without-metadata => Returns a PTransform for PCollection of KV, dropping Kafka metatdata.
sourceraw docstring

write-kafkaclj

(write-kafka bootstrap-servers topic key-serializer value-serializer p)
(write-kafka bootstrap-servers topic key-serializer value-serializer options p)

Write to Kafka.

Examples:

(kafka/write-kafka "broker-1:9092,broker-2:9092" "my-topic" key-serializer  value-serializer options pcoll)

Available options:

  • :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug.
  • :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder.
  • :name => Adds a name to the Transform.
  • :values => Writes just the values to Kafka.
  • :with-eos => Wrapper method over KafkaIO.WriteRecords.withEOS(int, String), used to keep the compatibility with old API based on KV type of element.
  • :with-input-timestamp => Wrapper method over KafkaIO.WriteRecords.withInputTimestamp(), used to keep the compatibility with old API based on KV type of element.
  • :with-producer-config-updates => Update configuration for the producer.
Write to Kafka.

Examples:
```
(kafka/write-kafka "broker-1:9092,broker-2:9092" "my-topic" key-serializer  value-serializer options pcoll)
```

Available options:

  - :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug.
  - :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder.
  - :name => Adds a name to the Transform.
  - :values => Writes just the values to Kafka.
  - :with-eos => Wrapper method over `KafkaIO.WriteRecords.withEOS(int, String)`, used to keep the compatibility with old API based on KV type of element.
  - :with-input-timestamp => Wrapper method over `KafkaIO.WriteRecords.withInputTimestamp()`, used to keep the compatibility with old API based on KV type of element.
  - :with-producer-config-updates => Update configuration for the producer.
sourceraw docstring

cljdoc is a website building & hosting documentation for Clojure/Script libraries

× close