(decode-messages options p)
Converts the input PubsubMessages to clojure objects. To use after read-from-pubsub
with type :raw
.
Returns an map with a :payload
key and an :attributes
key, assumes the payload is UTF-8 encoded
Converts the input PubsubMessages to clojure objects. To use after `read-from-pubsub` with type `:raw`. Returns an map with a `:payload` key and an `:attributes` key, assumes the payload is UTF-8 encoded
(encode-messages options p)
Converts the input to PubsubMessages. To use before write-to-pubsub
with type :raw
.
Takes as input a map with a :payload
key and an :attributes
key, assumes the payload is UTF-8 encoded
Converts the input to PubsubMessages. To use before `write-to-pubsub` with type `:raw`. Takes as input a map with a `:payload` key and an `:attributes` key, assumes the payload is UTF-8 encoded
(read-from-pubsub subscription-or-topic p)
(read-from-pubsub subscription-or-topic
{:keys [kind timestamp-label type]
:or {kind :subscription type :string}
:as options}
p)
Create an unbounded PCollection from a pubsub stream.
See https://cloud.google.com/dataflow/model/pubsub-io#reading-with-pubsubio.
Examples:
;; Assuming input message are UTF-8 encoded Strings:
(ps/read-from-pubsub "projects/my-project/subscriptions/my-subscription" pcoll)
If you need to access some attributes:
;; payload will be a string and attributes a map
(->> (ps/read-from-pubsub "projects/my-project/subscriptions/my-subscription" {:type :raw} pcoll)
(ps/decode-messages {})
(ds/map (fn [{:keys [payload attributes]}] (json/decode payload))))
Available options:
:subscription
or a :topic
(default to :topic
).:string.
Possible values are :string
: UTF-8 encoded strings, :raw
: pubsub message with attributes.Create an unbounded PCollection from a pubsub stream. See https://cloud.google.com/dataflow/model/pubsub-io#reading-with-pubsubio. Examples: ``` ;; Assuming input message are UTF-8 encoded Strings: (ps/read-from-pubsub "projects/my-project/subscriptions/my-subscription" pcoll) ``` If you need to access some attributes: ``` ;; payload will be a string and attributes a map (->> (ps/read-from-pubsub "projects/my-project/subscriptions/my-subscription" {:type :raw} pcoll) (ps/decode-messages {}) (ds/map (fn [{:keys [payload attributes]}] (json/decode payload)))) ``` Available options: - :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug. - :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder. - :kind => Specifies if the input is a `:subscription` or a `:topic` (default to `:topic`). - :name => Adds a name to the Transform. - :timestamp-label => Set the timestamp of the message using a message's attribute. The attribute should contain an Unix epoch in milliseconds. - :type => Specify the type of message reader, default to `:string.` Possible values are `:string`: UTF-8 encoded strings, `:raw`: pubsub message with attributes.
(write-to-pubsub topic pcoll)
(write-to-pubsub topic {:keys [type] :or {type :string} :as options} pcoll)
Write the contents of an unbounded PCollection to to a pubsub stream.
See https://cloud.google.com/dataflow/model/pubsub-io#writing-with-pubsubio.
Examples:
;; Assuming the input's pcoll are UTF-8 encoded Strings
(ps/write-to-pubsub "projects/my-project/topics/my-topic" pcoll)
If you need to specify some attributes:
;; Assuming the input pcoll's elements are {:payload "my message" :attributes {:key value}}
(->> pcoll
(ps/encode-messages {})
(ps/write-from-to-pubsub "projects/my-project/topics/my-topic" {:type :raw}))
Available options:
:string.
Possible values are :string
: UTF-8 encoded strings, :raw
: raw pubsub message.Write the contents of an unbounded PCollection to to a pubsub stream. See https://cloud.google.com/dataflow/model/pubsub-io#writing-with-pubsubio. Examples: ``` ;; Assuming the input's pcoll are UTF-8 encoded Strings (ps/write-to-pubsub "projects/my-project/topics/my-topic" pcoll) ``` If you need to specify some attributes: ``` ;; Assuming the input pcoll's elements are {:payload "my message" :attributes {:key value}} (->> pcoll (ps/encode-messages {}) (ps/write-from-to-pubsub "projects/my-project/topics/my-topic" {:type :raw})) ``` Available options: - :checkpoint => Given a path, will store the resulting pcoll at this path in edn to facilitate dev/debug. - :coder => Uses a specific Coder for the results of this transform. Usually defaults to some form of nippy-coder. - :name => Adds a name to the Transform. - :type => Specify the type of message writer, default to `:string.` Possible values are `:string`: UTF-8 encoded strings, `:raw`: raw pubsub message.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close