PuppetDB command handling
Commands are the mechanism by which changes are made to PuppetDB's
model of a population. Commands are represented by command objects, which have the following JSON wire format:
{"command": "...",
"version": 123,
"payload": <json object>}
payload must be a valid JSON string of any sort. It's up to an
individual handler function how to interpret that object.
More details can be found in the spec.
Commands should include a received field containing a timestamp
of when the message was first seen by the system. If this is
omitted, it will be added when the message is first parsed, but may
then be somewhat inaccurate.
Commands should include an id field containing a unique integer
identifier for the command. If this is omitted, it will be added
when the message is first parsed.
Failed messages will have an attempts annotation containing an
array of maps of the form:
{:timestamp <timestamp>
:error "some error message"
:trace <stack trace from :exception>}
Each entry corresponds to a single failed attempt at handling the message, containing the error message, stack trace, and timestamp for each failure. PuppetDB may discard messages which have been attempted and failed too many times, or which have experienced fatal errors (including unparseable messages).
Failed messages will be stored in files in the "dead letter
office", located under the MQ data directory, in
/discarded/<command>. These files contain the annotated message,
along with each exception that occured while trying to handle the
message.
We currently support the following wire formats for commands:
Java Strings
UTF-8 encoded byte-array
In either case, the command itself, once string-ified, must be a JSON-formatted string with the aforementioned structure.
PuppetDB command handling
Commands are the mechanism by which changes are made to PuppetDB's
model of a population. Commands are represented by `command
objects`, which have the following JSON wire format:
{"command": "...",
"version": 123,
"payload": <json object>}
`payload` must be a valid JSON string of any sort. It's up to an
individual handler function how to interpret that object.
More details can be found in [the spec](../spec/commands.md).
Commands should include a `received` field containing a timestamp
of when the message was first seen by the system. If this is
omitted, it will be added when the message is first parsed, but may
then be somewhat inaccurate.
Commands should include an `id` field containing a unique integer
identifier for the command. If this is omitted, it will be added
when the message is first parsed.
Failed messages will have an `attempts` annotation containing an
array of maps of the form:
{:timestamp <timestamp>
:error "some error message"
:trace <stack trace from :exception>}
Each entry corresponds to a single failed attempt at handling the
message, containing the error message, stack trace, and timestamp
for each failure. PuppetDB may discard messages which have been
attempted and failed too many times, or which have experienced
fatal errors (including unparseable messages).
Failed messages will be stored in files in the "dead letter
office", located under the MQ data directory, in
`/discarded/<command>`. These files contain the annotated message,
along with each exception that occured while trying to handle the
message.
We currently support the following wire formats for commands:
1. Java Strings
2. UTF-8 encoded byte-array
In either case, the command itself, once string-ified, must be a
JSON-formatted string with the aforementioned structure.(attempt-exec-command {:keys [callback] :as cmd}
db
conn-status
response-pub-chan
stats-atom
shutdown-for-ex
options-config)(broadcast-cmd {:keys [certname command id callback] :as cmd}
write-dbs
pool
response-chan
stats
shutdown-for-ex
options-config)(create-command-handler-threadpool size)Creates an unbounded threadpool with the intent that access to the
threadpool is bounded by the semaphore. Implicitly the threadpool is
bounded by size, but since the semaphore is handling that aspect,
it's more efficient to use an unbounded pool and not duplicate the
constraint in both the semaphore and the threadpool
Creates an unbounded threadpool with the intent that access to the threadpool is bounded by the semaphore. Implicitly the threadpool is bounded by `size`, but since the semaphore is handling that aspect, it's more efficient to use an unbounded pool and not duplicate the constraint in both the semaphore and the threadpool
(create-metrics-for-command! command version)Create a subtree of metrics for the given command and version (if present). If a subtree of metrics already exists, this function is a no-op.
Create a subtree of metrics for the given command and version (if present). If a subtree of metrics already exists, this function is a no-op.
Thread count for delaying messages for retry, and broadcast pool sanity checks.
Thread count for delaying messages for retry, and broadcast pool sanity checks.
(discard-message cmdref ex q dlo maybe-send-cmd-event!)Discards the given cmdref caused by ex
Discards the given `cmdref` caused by `ex`
(do-enqueue-command q
command-chan
write-semaphore
{:keys [command certname command-stream] :as command-req}
maybe-send-cmd-event!)Inputs: [q command-chan write-semaphore {:keys [command certname command-stream], :as command-req} :- queue/command-req-schema maybe-send-cmd-event!]
Stores command in the q and returns its id.
Inputs: [q command-chan write-semaphore {:keys [command certname command-stream], :as command-req} :- queue/command-req-schema maybe-send-cmd-event!]
Stores command in the q and returns its id.(exec-command {:keys [command version] :as cmd}
db
conn-status
start
options-config)Takes a command object and processes it to completion. Dispatch is based on the command's name and version information
Takes a command object and processes it to completion. Dispatch is based on the command's name and version information
(exec-configure-expiration {:keys [id received payload]}
start-time
db
conn-status)(exec-deactivate-node {:keys [id received payload]} start-time db conn-status)(exec-replace-catalog {:keys [id received payload]} start-time db conn-status)(exec-replace-catalog-inputs {:keys [id received payload]}
start-time
db
conn-status)(exec-replace-facts {:keys [payload id received] :as _command}
start-time
db
conn-status)(exec-store-report options-config
{:keys [payload id received]}
start-time
db
conn-status)(global-metric name)Returns the metric identified by name in the "global" metric
hierarchy
Returns the metric identified by `name` in the `"global"` metric hierarchy
(inc-cmd-depth command version)Ensures the command + version metric exists, then increments the
depth for the given command and version
Ensures the `command` + `version` metric exists, then increments the depth for the given `command` and `version`
(log-command-processed-messsage id
received-time
start-time
command-kw
certname
producer-ts
status)(log-command-processed-messsage id
received-time
start-time
command-kw
certname
producer-ts
{:keys [status hash] :as _status}
{:keys [puppet-version] :as _opts})(mark-both-metrics! command version k)Calls mark! on the global and command specific metric for k
Calls `mark!` on the global and command specific metric for `k`
(message-handler q
command-chan
dlo
delay-pool
broadcast-pool
write-dbs
response-chan
stats
options-config
maybe-send-cmd-event!
shutdown-for-ex)Processes the message via (process-message msg), retrying messages that fail via (delay-message msg), and discarding messages that have fatal errors or have exceeded their maximum allowed attempts.
Processes the message via (process-message msg), retrying messages that fail via (delay-message msg), and discarding messages that have fatal errors or have exceeded their maximum allowed attempts.
(prep-replace-facts {:keys [version received] :as command}
{:keys [facts-blocklist facts-blocklist-type]})(process-cmd cmd
cmdref
q
write-dbs
broadcast-pool
response-chan
stats
maybe-send-cmd-event!
shutdown-for-ex
options-config)Processes and acknowledges a successful command ref and updates the relevant metrics. Any exceptions that arise are unhandled and expected to be caught by the caller.
Processes and acknowledges a successful command ref and updates the relevant metrics. Any exceptions that arise are unhandled and expected to be caught by the caller.
(process-command! command db options-config)Takes a command object and processes it to completion. Dispatch is based on the command's name and version information. Should be given the options-config returned by select-command-processing-config
Takes a command object and processes it to completion. Dispatch is based on the command's name and version information. Should be given the options-config returned by select-command-processing-config
(process-delete-cmd cmd
{:keys [command version certname id received] :as cmdref}
q
response-chan
stats
_options-config
maybe-send-cmd-event!)Processes a command ref marked for deletion. This is similar to processing a non-delete cmdref except different metrics need to be updated to indicate the difference in command
Processes a command ref marked for deletion. This is similar to processing a non-delete cmdref except different metrics need to be updated to indicate the difference in command
(process-message {:keys [certname command version received delete? id]
:as cmdref}
q
command-chan
dlo
delay-pool
broadcast-pool
write-dbs
response-chan
stats
options-config
maybe-send-cmd-event!
shutdown-for-ex)(enqueue-command this command version certname producer-ts payload compression)(enqueue-command this
command
version
certname
producer-ts
payload
compression
command-callback)Submits the command for processing, and then returns its unique id.
Submits the command for processing, and then returns its unique id.
(pause-execution this)Pause command execution
Pause command execution
(response-mult this)Returns a core.async mult to which {:id :exception} maps are written after each command has been processed.
Returns a core.async mult to which {:id :exception} maps are written after
each command has been processed.(resume-execution this)Resume command execution
Resume command execution
(stats this)Returns command processing statistics as a map containing :received-commands (a count of the commands received so far by the current service instance), and :executed-commands (a count of the commands that the current instance has processed without triggering an exception).
Returns command processing statistics as a map containing :received-commands (a count of the commands received so far by the current service instance), and :executed-commands (a count of the commands that the current instance has processed without triggering an exception).
(schedule-delayed-message cmd ex command-chan delay-pool shutdown-for-ex)Will delay cmd in the delay-pool threadpool for
command-delay-ms. It will then be enqueued in command-chan
for another attempt at processing
Will delay `cmd` in the `delay-pool` threadpool for `command-delay-ms`. It will then be enqueued in `command-chan` for another attempt at processing
(shut-down-after-command-scheduler-unresponsive f)(start-command-service context
config
{:keys [dlo] :as globals}
request-shutdown)(stop-command-service {:keys [consumer-threadpool broadcast-threadpool
command-chan response-chan response-chan-for-pub
response-mult response-pub delay-pool
command-shovel _blocked?]
:as context}
request-shutdown)(warn-deprecated version command)Logs a deprecation warning message for the given command and version
Logs a deprecation warning message for the given `command` and `version`
cljdoc builds & hosts documentation for Clojure/Script libraries
| Ctrl+k | Jump to recent docs |
| ← | Move to previous article |
| → | Move to next article |
| Ctrl+/ | Jump to the search field |