(close-handler {:keys [in-ch close-ch]})
Listens to close-ch
for a true
message. When received, shuts down in-ch
and close-ch
Listens to `close-ch` for a `true` message. When received, shuts down `in-ch` and `close-ch`
(control-loop {:keys [in-ch ctl-ch out-ch close-ch driver output-policy]})
Starts the loop that process control commands from the user
Starts the loop that process control commands from the user
(create config)
Create a producer, returning a map that contains 3 channels:
:in-ch
- Input channel used to send messages to the producer.
:ctl-ch
- Control channel used to manage the producer connection
:out-ch
- Output channel used to receive messages from the Kafka producer and controller.
This channel will be nil
unless a non-empty output-policy
set is specified.
Configuration options:
:output-policy
: A set containing the output sources that should be returned via the out-ch
.
Valid options include:
`:control`: Output from control operations
`:error`: Any exceptions or errors that occur
`:data`: Response from kafka producer after sending a message
The Kafka Java interface returns each of this as a Java object. Gregor converts each
to pure data (i.e. a map).
:timeout
: time to wait, in milliseconds, for queued messages to send when closing the producer
:key-serializer
: Serializer to use for key serialization, default is :edn
.
Valid serializers are :edn
, :string
, :json
and :keyword
:value-serializer
: Serializer to use for value serialization, default is :edn
.
Valid serializers are :edn
, :string
and :json
:input-buffer
: Buffer size of :in-ch
and ctl-ch
, default is 10
:output-buffer
: Buffer size of :out-ch
, default is 100
:kafka-configuration
: Map containing kafka producer configuration settings. This map will be converted
into key/value properties and passed directly to the KafkaProducer object.
:transducer
: Transformation function to be applied to all data received via in-ch
channel, prior to being
posted to Kafka.
Example Configuration:
{:kafka-configuration {:bootstrap.servers "localhost:9092" :max.poll.recordss 1000} :output-policy #{:control :error} :key-serializer :string :value-serializer :json :input-buffer 20 :output-buffer 50 :timeout 100}
All key value pairs in :kafka-configuration
will be converted and inserted into a property map and
passed into the Kafka Java client as configuration options.
Create a producer, returning a map that contains 3 channels: `:in-ch` - Input channel used to send messages to the producer. `:ctl-ch` - Control channel used to manage the producer connection `:out-ch` - Output channel used to receive messages from the Kafka producer and controller. This channel will be `nil` unless a non-empty `output-policy` set is specified. Configuration options: `:output-policy`: A set containing the output sources that should be returned via the `out-ch`. Valid options include: `:control`: Output from control operations `:error`: Any exceptions or errors that occur `:data`: Response from kafka producer after sending a message The Kafka Java interface returns each of this as a Java object. Gregor converts each to pure data (i.e. a map). `:timeout`: time to wait, in milliseconds, for queued messages to send when closing the producer `:key-serializer`: Serializer to use for key serialization, default is `:edn`. Valid serializers are `:edn`, `:string`, `:json` and `:keyword` `:value-serializer`: Serializer to use for value serialization, default is `:edn`. Valid serializers are `:edn`, `:string` and `:json` `:input-buffer`: Buffer size of `:in-ch` and `ctl-ch`, default is 10 `:output-buffer`: Buffer size of `:out-ch`, default is 100 `:kafka-configuration`: Map containing kafka producer configuration settings. This map will be converted into key/value properties and passed directly to the KafkaProducer object. `:transducer`: Transformation function to be applied to all data received via `in-ch` channel, prior to being posted to Kafka. Example Configuration: `{:kafka-configuration {:bootstrap.servers "localhost:9092" :max.poll.recordss 1000} :output-policy #{:control :error} :key-serializer :string :value-serializer :json :input-buffer 20 :output-buffer 50 :timeout 100}` All key value pairs in `:kafka-configuration` will be converted and inserted into a property map and passed into the Kafka Java client as configuration options.
(create-context {:keys [input-buffer output-buffer output-policy timeout
transducer]
:or {input-buffer default-input-buffer
output-buffer default-output-buffer
timeout default-timeout
output-policy #{}}
:as config})
Builds context map containing all the driver, various channels and configuration options
Builds context map containing all the driver, various channels and configuration options
(input-loop {:keys [in-ch out-ch close-ch output-policy driver timeout]
:as context})
Starts the loop that processes input, waiting for the user to send input on in
channel
Starts the loop that processes input, waiting for the user to send input on `in` channel
(process-input message driver output-policy)
Sends message
to the kafka producer
, returning results via the out
channel
Sends `message` to the kafka `producer`, returning results via the `out` channel
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close