Clojure core.async support in kinsky. See https://github.com/pyr/kinsky for example usage.
Clojure core.async support in kinsky. See https://github.com/pyr/kinsky for example usage.
(channel-listener ch)
A rebalance-listener compatible call back which produces all events onto a channel.
A rebalance-listener compatible call back which produces all events onto a channel.
(consumer config)
(consumer config kd vd)
Build an async consumer. Yields a vector of record and control channels.
Arguments config ks and vs work as for kinsky.client/consumer. The config map must be a valid consumer configuration map and may contain the following additional keys:
:input-buffer
: Maximum backlog of control channel messages.:output-buffer
: Maximum queued consumed messages.:timeout
: Poll interval:topic
: Automatically subscribe to this topic before launching loop:duplex?
: yields a duplex channel instead of a vector of two chansThe resulting control channel is used to interact with the consumer driver
and expects map payloads, whose operation is determined by their
:op
key. The following commands are handled:
:subscribe
: {:op :subscribe :topic "foo"}
subscribe to a topic.:unsubscribe
: {:op :unsubscribe}
, unsubscribe from all topics.:partitions-for
: {:op :partitions-for :topic "foo"}
, yield partition
info for the given topic. If a :response
key is
present, produce the response there instead of on
the record channel.commit
: {:op :commit}
commit offsets, an optional :topic-offsets
key
may be present for specific offset committing.:pause
: {:op :pause}
pause consumption.:resume
: {:op :resume}
resume consumption.:calllback
: {:op :callback :callback (fn [d ch])}
Execute a function
of 2 arguments, the consumer driver and output channel, on a woken up
driver.:stop
: {:op :stop}
stop and close consumer.The resulting output channel will emit payloads with as maps containing a
:type
key where :type
may be:
:record
: A consumed record.:exception
: An exception raised:rebalance
: A rebalance event.:eof
: The end of this stream.:partitions
: The result of a :partitions-for
operation.:woken-up
: A notification that the consumer was woken up.Build an async consumer. Yields a vector of record and control channels. Arguments config ks and vs work as for kinsky.client/consumer. The config map must be a valid consumer configuration map and may contain the following additional keys: - `:input-buffer`: Maximum backlog of control channel messages. - `:output-buffer`: Maximum queued consumed messages. - `:timeout`: Poll interval - `:topic` : Automatically subscribe to this topic before launching loop - `:duplex?`: yields a duplex channel instead of a vector of two chans The resulting control channel is used to interact with the consumer driver and expects map payloads, whose operation is determined by their `:op` key. The following commands are handled: - `:subscribe`: `{:op :subscribe :topic "foo"}` subscribe to a topic. - `:unsubscribe`: `{:op :unsubscribe}`, unsubscribe from all topics. - `:partitions-for`: `{:op :partitions-for :topic "foo"}`, yield partition info for the given topic. If a `:response` key is present, produce the response there instead of on the record channel. - `commit`: `{:op :commit}` commit offsets, an optional `:topic-offsets` key may be present for specific offset committing. - `:pause`: `{:op :pause}` pause consumption. - `:resume`: `{:op :resume}` resume consumption. - `:calllback`: `{:op :callback :callback (fn [d ch])}` Execute a function of 2 arguments, the consumer driver and output channel, on a woken up driver. - `:stop`: `{:op :stop}` stop and close consumer. The resulting output channel will emit payloads with as maps containing a `:type` key where `:type` may be: - `:record`: A consumed record. - `:exception`: An exception raised - `:rebalance`: A rebalance event. - `:eof`: The end of this stream. - `:partitions`: The result of a `:partitions-for` operation. - `:woken-up`: A notification that the consumer was woken up.
Default amount of messages buffered on control channels.
Default amount of messages buffered on control channels.
Default amount of messages buffered on the record channel.
Default amount of messages buffered on the record channel.
Default timeout, by default we poll at 100ms intervals.
Default timeout, by default we poll at 100ms intervals.
(exception? e)
Test if a value is a subclass of Exception
Test if a value is a subclass of Exception
(make-consumer config kd vd)
Build a consumer, with or without deserializers
Build a consumer, with or without deserializers
(make-producer config ks vs)
Build a producer, with or without serializers
Build a producer, with or without serializers
(poller-thread driver inbuf outbuf timeout)
Poll for next messages, catching exceptions and yielding them.
Poll for next messages, catching exceptions and yielding them.
(producer config)
(producer config ks)
(producer config ks vs)
Build a producer, reading records to send from a channel.
Arguments config ks and vs work as for kinsky.client/producer. The config map must be a valid consumer configuration map and may contain the following additional keys:
:input-buffer
: Maximum backlog of control channel messages.:output-buffer
: Maximum queued consumed messages.Yields a vector of two values [in out]
, an input channel and
an output channel.
The resulting input channel is used to interact with the producer driver
and expects map payloads, whose operation is determined by their
:op
key. The following commands are handled:
:record
: {:op :record :topic "foo"}
send a record out, also
performed when no :op
key is present.:flush
: {:op :flush}
, flush unsent messages.:partitions-for
: {:op :partitions-for :topic "foo"}
, yield partition
info for the given topic. If a :response
key is
present, produce the response there instead of on
the record channel.:close
: {:op :close}
, close the producer.The resulting output channel will emit payloads with as maps containing a
:type
key where :type
may be:
:exception
: An exception raised:partitions
: The result of a :partitions-for
operation.:eof
: The producer is closed.Build a producer, reading records to send from a channel. Arguments config ks and vs work as for kinsky.client/producer. The config map must be a valid consumer configuration map and may contain the following additional keys: - `:input-buffer`: Maximum backlog of control channel messages. - `:output-buffer`: Maximum queued consumed messages. Yields a vector of two values `[in out]`, an input channel and an output channel. The resulting input channel is used to interact with the producer driver and expects map payloads, whose operation is determined by their `:op` key. The following commands are handled: - `:record`: `{:op :record :topic "foo"}` send a record out, also performed when no `:op` key is present. - `:flush`: `{:op :flush}`, flush unsent messages. - `:partitions-for`: `{:op :partitions-for :topic "foo"}`, yield partition info for the given topic. If a `:response` key is present, produce the response there instead of on the record channel. - `:close`: `{:op :close}`, close the producer. The resulting output channel will emit payloads with as maps containing a `:type` key where `:type` may be: - `:exception`: An exception raised - `:partitions`: The result of a `:partitions-for` operation. - `:eof`: The producer is closed.
Rely on the standard transducer but indicate that this is a record.
Rely on the standard transducer but indicate that this is a record.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close