(read-kafka bootstrap-servers topic key-deserializer value-deserializer p)(read-kafka bootstrap-servers
topic
key-deserializer
value-deserializer
options
p)Reads from a Kafka topic. Returns a KafkaRecord (https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/io/kafka/KafkaRecord.html) mapped to a clojure map
{:payload "Deserialized with `value-deserializer`"
:key "Deserialized with `key-deserializer`"
:offset "..."
:partition "..."
:timestamp "..."
:topic "..."
:headers "A map of `{key values}` of `Header` (https://www.javadoc.io/static/org.apache.kafka/kafka-clients/1.0.0/org/apache/kafka/common/header/Header.html)"}
Examples:
(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" key-deserializer value-deserializer options pcoll)
Using StringDeserializer from org.apache.kafka.common.serialization (https://kafka.apache.org/0102/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html):
(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" StringDeserializer StringDeserializer {:name :read-from-kafka} pcoll)
Available options:
KafkaTimestampType.CREATE_TIME timestamp of the records.TimestampPolicyFactory.LogAppendTimePolicy.Read.Unbounded.withMaxNumRecords(long).Read.Unbounded.withMaxReadTime(Duration).TimestampPolicyFactory.ProcessingTimePolicy.Reads from a Kafka topic. Returns a `KafkaRecord` (https://beam.apache.org/releases/javadoc/2.17.0/org/apache/beam/sdk/io/kafka/KafkaRecord.html) mapped to a clojure map
```
{:payload "Deserialized with `value-deserializer`"
:key "Deserialized with `key-deserializer`"
:offset "..."
:partition "..."
:timestamp "..."
:topic "..."
:headers "A map of `{key values}` of `Header` (https://www.javadoc.io/static/org.apache.kafka/kafka-clients/1.0.0/org/apache/kafka/common/header/Header.html)"}
```
Examples:
```
(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" key-deserializer value-deserializer options pcoll)
```
Using `StringDeserializer` from `org.apache.kafka.common.serialization` (https://kafka.apache.org/0102/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html):
```
(kafka/read-kafka "broker-1:9092,broker-2:9092" "my-topic" StringDeserializer StringDeserializer {:name :read-from-kafka} pcoll)
```
Available options:
- :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug.
- :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder.
- :commit-offsets-in-finalize => Finalized offsets are committed to Kafka.
- :name => Adds a name to the Transform.
- :with-consumer-config-updates => Update configuration for the backend main consumer.
- :with-create-time => Sets the timestamps policy based on `KafkaTimestampType.CREATE_TIME` timestamp of the records.
- :with-log-append-time => Sets TimestampPolicy to `TimestampPolicyFactory.LogAppendTimePolicy`.
- :with-max-num-records => Similar to `Read.Unbounded.withMaxNumRecords(long)`.
- :with-max-read-time => Similar to `Read.Unbounded.withMaxReadTime(Duration)`.
- :with-offset-consumer-config-overrides =>
- :with-processing-time => Sets TimestampPolicy to `TimestampPolicyFactory.ProcessingTimePolicy`.
- :with-read-committed => Sets " isolation_level " to " read_committed " in Kafka consumer configuration.
- :with-start-read-time => Provide custom `TimestampPolicyFactory to set event times and watermark for each partition.
- :with-topic-partitions => Sets a list of partitions to read from. The list of partitions should be a collection of ['str-topic-name', int-partition-number]
- :without-metadata => Returns a PTransform for PCollection of KV, dropping Kafka metatdata.(write-kafka bootstrap-servers topic key-serializer value-serializer p)(write-kafka bootstrap-servers topic key-serializer value-serializer options p)Write to Kafka.
Examples:
(kafka/write-kafka "broker-1:9092,broker-2:9092" "my-topic" key-serializer value-serializer options pcoll)
Available options:
KafkaIO.WriteRecords.withEOS(int, String), used to keep the compatibility with old API based on KV type of element.KafkaIO.WriteRecords.withInputTimestamp(), used to keep the compatibility with old API based on KV type of element.Write to Kafka. Examples: ``` (kafka/write-kafka "broker-1:9092,broker-2:9092" "my-topic" key-serializer value-serializer options pcoll) ``` Available options: - :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug. - :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder. - :name => Adds a name to the Transform. - :values => Writes just the values to Kafka. - :with-eos => Wrapper method over `KafkaIO.WriteRecords.withEOS(int, String)`, used to keep the compatibility with old API based on KV type of element. - :with-input-timestamp => Wrapper method over `KafkaIO.WriteRecords.withInputTimestamp()`, used to keep the compatibility with old API based on KV type of element. - :with-producer-config-updates => Update configuration for the producer.
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |