Liking cljdoc? Tell your friends :D
Clojure only.

puppetlabs.puppetdb.command

PuppetDB command handling

Commands are the mechanism by which changes are made to PuppetDB's model of a population. Commands are represented by command objects, which have the following JSON wire format:

{"command": "...",
 "version": 123,
 "payload": <json object>}

payload must be a valid JSON string of any sort. It's up to an individual handler function how to interpret that object.

More details can be found in the spec.

Commands should include a received field containing a timestamp of when the message was first seen by the system. If this is omitted, it will be added when the message is first parsed, but may then be somewhat inaccurate.

Commands should include an id field containing a unique integer identifier for the command. If this is omitted, it will be added when the message is first parsed.

Failed messages will have an attempts annotation containing an array of maps of the form:

{:timestamp <timestamp>
 :error     "some error message"
 :trace     <stack trace from :exception>}

Each entry corresponds to a single failed attempt at handling the message, containing the error message, stack trace, and timestamp for each failure. PuppetDB may discard messages which have been attempted and failed too many times, or which have experienced fatal errors (including unparseable messages).

Failed messages will be stored in files in the "dead letter office", located under the MQ data directory, in /discarded/<command>. These files contain the annotated message, along with each exception that occured while trying to handle the message.

We currently support the following wire formats for commands:

  1. Java Strings

  2. UTF-8 encoded byte-array

In either case, the command itself, once string-ified, must be a JSON-formatted string with the aforementioned structure.

PuppetDB command handling

Commands are the mechanism by which changes are made to PuppetDB's
model of a population. Commands are represented by `command
objects`, which have the following JSON wire format:

    {"command": "...",
     "version": 123,
     "payload": <json object>}

`payload` must be a valid JSON string of any sort. It's up to an
individual handler function how to interpret that object.

More details can be found in [the spec](../spec/commands.md).

Commands should include a `received` field containing a timestamp
of when the message was first seen by the system. If this is
omitted, it will be added when the message is first parsed, but may
then be somewhat inaccurate.

Commands should include an `id` field containing a unique integer
identifier for the command. If this is omitted, it will be added
when the message is first parsed.

Failed messages will have an `attempts` annotation containing an
array of maps of the form:

    {:timestamp <timestamp>
     :error     "some error message"
     :trace     <stack trace from :exception>}

Each entry corresponds to a single failed attempt at handling the
message, containing the error message, stack trace, and timestamp
for each failure. PuppetDB may discard messages which have been
attempted and failed too many times, or which have experienced
fatal errors (including unparseable messages).

Failed messages will be stored in files in the "dead letter
office", located under the MQ data directory, in
`/discarded/<command>`. These files contain the annotated message,
along with each exception that occured while trying to handle the
message.

We currently support the following wire formats for commands:

1. Java Strings

2. UTF-8 encoded byte-array

In either case, the command itself, once string-ified, must be a
JSON-formatted string with the aforementioned structure.
raw docstring

attempt-exec-commandclj

(attempt-exec-command {:keys [callback] :as cmd}
                      db
                      conn-status
                      response-pub-chan
                      stats-atom
                      shutdown-for-ex
                      options-config)
source

broadcast-cmdclj

(broadcast-cmd {:keys [certname command id callback] :as cmd}
               write-dbs
               pool
               response-chan
               stats
               shutdown-for-ex
               options-config)
source

call-with-quick-retryclj

(call-with-quick-retry num-retries shutdown-for-ex f)
source

cmd-metricclj

(cmd-metric cmd version name)
source

command-delay-msclj

source

command-scheduler-shutdown-wait-msclj

source

command-serviceclj

source

command-validatorclj

(command-validator name schema)
source

concurrent-depth-hookclj

(concurrent-depth-hook)
source

create-broadcast-poolclj

(create-broadcast-pool cmd-concurrency write-dbs)
source

create-command-handler-threadpoolclj

(create-command-handler-threadpool size)

Creates an unbounded threadpool with the intent that access to the threadpool is bounded by the semaphore. Implicitly the threadpool is bounded by size, but since the semaphore is handling that aspect, it's more efficient to use an unbounded pool and not duplicate the constraint in both the semaphore and the threadpool

Creates an unbounded threadpool with the intent that access to the
threadpool is bounded by the semaphore. Implicitly the threadpool is
bounded by `size`, but since the semaphore is handling that aspect,
it's more efficient to use an unbounded pool and not duplicate the
constraint in both the semaphore and the threadpool
sourceraw docstring

create-metricsclj

(create-metrics prefix)
source

create-metrics-for-command!clj

(create-metrics-for-command! command version)

Create a subtree of metrics for the given command and version (if present). If a subtree of metrics already exists, this function is a no-op.

Create a subtree of metrics for the given command and version (if
present).  If a subtree of metrics already exists, this function is
a no-op.
sourceraw docstring

deactivate-node-wire-v1->wire-3clj

(deactivate-node-wire-v1->wire-3 deactive-node)
source

deactivate-node-wire-v2->wire-3clj

(deactivate-node-wire-v2->wire-3 deactive-node)
source

delay-pool-sizeclj

Thread count for delaying messages for retry, and broadcast pool sanity checks.

Thread count for delaying messages for retry, and broadcast pool
sanity checks.
sourceraw docstring

discard-messageclj

(discard-message cmdref ex q dlo maybe-send-cmd-event!)

Discards the given cmdref caused by ex

Discards the given `cmdref` caused by `ex`
sourceraw docstring

do-enqueue-commandclj

(do-enqueue-command q
                    command-chan
                    write-semaphore
                    {:keys [command certname command-stream] :as command-req}
                    maybe-send-cmd-event!)

Inputs: [q command-chan write-semaphore {:keys [command certname command-stream], :as command-req} :- queue/command-req-schema maybe-send-cmd-event!]

Stores command in the q and returns its id.

Inputs: [q command-chan write-semaphore {:keys [command certname command-stream], :as command-req} :- queue/command-req-schema maybe-send-cmd-event!]

Stores command in the q and returns its id.
sourceraw docstring

do-replace-catalogclj

(do-replace-catalog catalog certname producer-timestamp received)
source

do-replace-factsclj

(do-replace-facts certname producer-timestamp payload)
source

enqueue-delayed-messageclj

(enqueue-delayed-message command-chan narrowed-entry)
source

exec-commandclj

(exec-command {:keys [command version] :as cmd}
              db
              conn-status
              start
              options-config)

Takes a command object and processes it to completion. Dispatch is based on the command's name and version information

Takes a command object and processes it to completion. Dispatch is
based on the command's name and version information
sourceraw docstring

exec-configure-expirationclj

(exec-configure-expiration {:keys [id received payload]}
                           start-time
                           db
                           conn-status)
source

exec-deactivate-nodeclj

(exec-deactivate-node {:keys [id received payload]} start-time db conn-status)
source

exec-replace-catalogclj

(exec-replace-catalog {:keys [id received payload]} start-time db conn-status)
source

exec-replace-catalog-inputsclj

(exec-replace-catalog-inputs {:keys [id received payload]}
                             start-time
                             db
                             conn-status)
source

exec-replace-factsclj

(exec-replace-facts {:keys [payload id received] :as _command}
                    start-time
                    db
                    conn-status)
source

exec-store-reportclj

(exec-store-report options-config
                   {:keys [payload id received]}
                   start-time
                   db
                   conn-status)
source

fatalityclj

(fatality cause)
source

global-metricclj

(global-metric name)

Returns the metric identified by name in the "global" metric hierarchy

Returns the metric identified by `name` in the `"global"` metric
hierarchy
sourceraw docstring

inc-cmd-depthclj

(inc-cmd-depth command version)

Ensures the command + version metric exists, then increments the depth for the given command and version

Ensures the `command` + `version` metric exists, then increments the
depth for the given `command` and `version`
sourceraw docstring

init-command-serviceclj

(init-command-service context config request-shutdown)
source

log-command-processed-messsageclj

(log-command-processed-messsage id
                                received-time
                                start-time
                                command-kw
                                certname
                                producer-ts
                                status)
(log-command-processed-messsage id
                                received-time
                                start-time
                                command-kw
                                certname
                                producer-ts
                                {:keys [status hash] :as _status}
                                {:keys [puppet-version] :as _opts})
source

make-cmd-processed-messageclj

(make-cmd-processed-message cmd ex)
source

mark-both-metrics!clj

(mark-both-metrics! command version k)

Calls mark! on the global and command specific metric for k

Calls `mark!` on the global and command specific metric for `k`
sourceraw docstring

maximum-allowable-retriesclj

source

message-handlerclj

(message-handler q
                 command-chan
                 dlo
                 delay-pool
                 broadcast-pool
                 write-dbs
                 response-chan
                 stats
                 options-config
                 maybe-send-cmd-event!
                 shutdown-for-ex)

Processes the message via (process-message msg), retrying messages that fail via (delay-message msg), and discarding messages that have fatal errors or have exceeded their maximum allowed attempts.

Processes the message via (process-message msg), retrying messages
that fail via (delay-message msg), and discarding messages that have
fatal errors or have exceeded their maximum allowed attempts.
sourceraw docstring

metricsclj

source

mq-metrics-registryclj

source

normal-broadcast-pool-sizeclj

(normal-broadcast-pool-size cmd-concurrency write-dbs)
source

pause-command-serviceclj

(pause-command-service blocked?)
source

prep-commandclj

(prep-command {:keys [command] :as cmd} options-config)
source

prep-configure-expirationclj

(prep-configure-expiration command)
source

prep-deactivate-nodeclj

(prep-deactivate-node {:keys [version] :as command})
source

prep-replace-catalogclj

(prep-replace-catalog {:keys [payload received version] :as command})
source

prep-replace-catalog-inputsclj

(prep-replace-catalog-inputs command)
source

prep-replace-factsclj

(prep-replace-facts {:keys [version received] :as command}
                    {:keys [facts-blocklist facts-blocklist-type]})
source

prep-store-reportclj

(prep-store-report {:keys [version received] :as command})
source

process-cmdclj

(process-cmd cmd
             cmdref
             q
             write-dbs
             broadcast-pool
             response-chan
             stats
             maybe-send-cmd-event!
             shutdown-for-ex
             options-config)

Processes and acknowledges a successful command ref and updates the relevant metrics. Any exceptions that arise are unhandled and expected to be caught by the caller.

Processes and acknowledges a successful command ref and updates the
relevant metrics. Any exceptions that arise are unhandled and
expected to be caught by the caller.
sourceraw docstring

process-command!clj

(process-command! command db options-config)

Takes a command object and processes it to completion. Dispatch is based on the command's name and version information. Should be given the options-config returned by select-command-processing-config

Takes a command object and processes it to completion. Dispatch is
based on the command's name and version information. Should be given
the options-config returned by select-command-processing-config
sourceraw docstring

process-delete-cmdclj

(process-delete-cmd cmd
                    {:keys [command version certname id received] :as cmdref}
                    q
                    response-chan
                    stats
                    _options-config
                    maybe-send-cmd-event!)

Processes a command ref marked for deletion. This is similar to processing a non-delete cmdref except different metrics need to be updated to indicate the difference in command

Processes a command ref marked for deletion. This is similar to
processing a non-delete cmdref except different metrics need to be
updated to indicate the difference in command
sourceraw docstring

process-messageclj

(process-message {:keys [certname command version received delete? id]
                  :as cmdref}
                 q
                 command-chan
                 dlo
                 delay-pool
                 broadcast-pool
                 write-dbs
                 response-chan
                 stats
                 options-config
                 maybe-send-cmd-event!
                 shutdown-for-ex)
source

PuppetDBCommandDispatchercljprotocol

enqueue-commandclj

(enqueue-command this command version certname producer-ts payload compression)
(enqueue-command this
                 command
                 version
                 certname
                 producer-ts
                 payload
                 compression
                 command-callback)

Submits the command for processing, and then returns its unique id.

Submits the command for processing, and then returns its unique id.

pause-executionclj

(pause-execution this)

Pause command execution

Pause command execution

response-multclj

(response-mult this)

Returns a core.async mult to which {:id :exception} maps are written after each command has been processed.

Returns a core.async mult to which {:id :exception} maps are written after
each command has been processed.

resume-executionclj

(resume-execution this)

Resume command execution

Resume command execution

statsclj

(stats this)

Returns command processing statistics as a map containing :received-commands (a count of the commands received so far by the current service instance), and :executed-commands (a count of the commands that the current instance has processed without triggering an exception).

Returns command processing statistics as a map
containing :received-commands (a count of the commands received so
far by the current service instance), and :executed-commands (a
count of the commands that the current instance has processed
without triggering an exception).
source

quick-retry-countclj

source

resume-command-serviceclj

(resume-command-service blocked?)
source

rm-facts-by-regexclj

(rm-facts-by-regex facts-blocklist fact-map)
source

schedule-delayed-messageclj

(schedule-delayed-message cmd ex command-chan delay-pool shutdown-for-ex)

Will delay cmd in the delay-pool threadpool for command-delay-ms. It will then be enqueued in command-chan for another attempt at processing

Will delay `cmd` in the `delay-pool` threadpool for
`command-delay-ms`. It will then be enqueued in `command-chan`
for another attempt at processing
sourceraw docstring

schedule-msg-afterclj

source

select-command-processing-configclj

(select-command-processing-config config)
source

shut-down-after-command-scheduler-unresponsiveclj

(shut-down-after-command-scheduler-unresponsive f)
source

shut-down-command-scheduler-or-dieclj

(shut-down-command-scheduler-or-die s request-shutdown)
source

simplify-serialization-exclj

(simplify-serialization-ex ex)
source

start-command-serviceclj

(start-command-service context
                       config
                       {:keys [dlo] :as globals}
                       request-shutdown)
source

stop-command-serviceclj

(stop-command-service {:keys [consumer-threadpool broadcast-threadpool
                              command-chan response-chan response-chan-for-pub
                              response-mult response-pub delay-pool
                              command-shovel _blocked?]
                       :as context}
                      request-shutdown)
source

stop-commands-wait-msclj

source

supported-command?clj

(supported-command? {:keys [command version] :as _cmd})
source

threadpool-shutdown-msclj

source

throw-unless-readyclj

(throw-unless-ready context)
source

update-counter!clj

(update-counter! metric command version action!)
source

upon-error-throw-fatalitycljmacro

(upon-error-throw-fatality & body)
source

validate-deactivateclj

source

validate-expirationclj

source

validate-reportclj

source

warn-deprecatedclj

(warn-deprecated version command)

Logs a deprecation warning message for the given command and version

Logs a deprecation warning message for the given `command` and `version`
sourceraw docstring

cljdoc builds & hosts documentation for Clojure/Script libraries

Keyboard shortcuts
Ctrl+kJump to recent docs
Move to previous article
Move to next article
Ctrl+/Jump to the search field
× close