A library for subscribing to PostgreSQL queries and getting auto-updating atoms.
A library for subscribing to PostgreSQL queries and getting auto-updating atoms.
(decode-notification msg)
Given a JSON payload string from pg NOTIFY, parse it and decode each column in via the pg driver using the OID.
Given a JSON payload string from pg NOTIFY, parse it and decode each column in via the pg driver using the OID.
(destroy-pg-realtime-objects! conn)
Cleans up all installed triggers and functions that start with pg_realtime.
Cleans up all installed triggers and functions that start with _pg_realtime_.
(should-refresh? conn
result
watched-columns
refresh?
{:keys [table operation changes row] :as notification-data})
(shutdown!)
Shutdown the real-time query system and clean up resources.
Shutdown the real-time query system and clean up resources.
(start!
db-config
{:keys [notification-buffer notification-polling-interval-ms error-handler]
:or {notification-buffer 100
notification-polling-interval-ms 200
error-handler (fn [e] (log/error e "Error in notification handler"))}})
Initialize pg-realtime system. Must be called before using sub. Parameters:
Initialize pg-realtime system. Must be called before using sub. Parameters: - db-config: Database connection configuration map (see pg2 doc for details). The user must have permission to create triggers and functions. - opts: Options map containing: - :error-handler - Function to handle errors during notification processing. Takes the error as argument. Defaults to logging the error. - :notification-buffer - core.async buffer or size of the notification channel buffer (default: 100). - :notification-polling-interval-ms - Interval in milliseconds for polling notifications (default: 200).
(start-notification-poller listener-conn interval-ms)
Spawns a go‑loop that calls pg/poll-notifications
every interval-ms
.
Returns a channel. Close it to stop polling.
Spawns a go‑loop that calls `pg/poll-notifications` every `interval-ms`. Returns a channel. Close it to stop polling.
(sub id)
(sub id conn query)
(sub id
conn
query
{:keys [throttle-ms error-handler refresh?]
:or {throttle-ms 500
refresh? (fn [_conn _result _notification-data] :tracked-columns)}
:as opts})
Subscribe to a query and return an atom that updates when data changes. This will install triggers on the tables used in the query to listen for changes.
Use unsub
to stop the subscription and free up resources (which will stop refreshing the query).
Provide just an id
to return the atom of an existing subscription.
Parameters:
id: Unique identifier for this subscription. Use meaningful/stable names rather than random ones, e.g. :all-users, "user-123-orders". If a sub with this id already exists, the previous one will be automatically removed.
conn: Database connection or pool used to execute the query
query: SQL query string, see pg2/execute for details.
opts: Options map containing:
:throttle-ms - Ensure the query is refreshed at most once every throttle-ms
milliseconds.
This prevents excessive updates on high-traffic tables.
Defaults to 500ms.
:error-handler - Function to handle errors during query execution. It takes the error and the query as arguments. Defaults to logging the error.
:refresh? - Customise when to refresh the query. Can be a map or a function.
When not provided, it defaults to a function that checks if any of the columns used
in the query have been changed (which is always true for :insert/:delete),
which is sufficient for most cases.
When using a map, it should be in the format:
{:table-name {:column-name value}}
Note that the values are coerced to the type OID of the column.
To look up a value in the existing result:
{:table-name {:column-name :result/column-name}}
When using a function, provide a fn that takes 3 args:
`conn`, `result` and `notification-data`
and returns boolean indicating whether to refresh.
`conn` is the database connection used to execute the query.
`result` is the current result of the query.
`notification-data` is a map with keys:
- :operation - The operation that triggered the notification,
e.g. :insert, :update, :delete
- :table - The table that was changed, e.g. :users.
If the schema is not public, it will be in the format :schema/table.
For partitioned tables, this is always the parent table.
- :row - The row that was changed, in the format:
{:column-name value}
The values are coerced to the type OID of the column.
- :changes - A map of changed columns and their old and new values.
The map is in the format:
{:column-name [old-value new-value]}
For :insert operations, old-value is nil. For :delete, new-value is nil.
- :hashed - A set of columns that were hashed in the notification
(because of payload size limits).
You can also return :tracked-columns instead of a boolean to fall back to
the default behavior for a specific table.
Any additional options are passed to pg2/execute
Subscribe to a query and return an atom that updates when data changes. This will install triggers on the tables used in the query to listen for changes. Use `unsub` to stop the subscription and free up resources (which will stop refreshing the query). Provide just an `id` to return the atom of an existing subscription. Parameters: - id: Unique identifier for this subscription. Use meaningful/stable names rather than random ones, e.g. :all-users, "user-123-orders". If a sub with this id already exists, the previous one will be automatically removed. - conn: Database connection or pool used to execute the query - query: SQL query string, see pg2/execute for details. - opts: Options map containing: - :throttle-ms - Ensure the query is refreshed at most once every `throttle-ms` milliseconds. This prevents excessive updates on high-traffic tables. Defaults to 500ms. - :error-handler - Function to handle errors during query execution. It takes the error and the query as arguments. Defaults to logging the error. - :refresh? - Customise when to refresh the query. Can be a map or a function. When not provided, it defaults to a function that checks if any of the columns used in the query have been changed (which is always true for :insert/:delete), which is sufficient for most cases. When using a map, it should be in the format: {:table-name {:column-name value}} Note that the values are coerced to the type OID of the column. To look up a value in the existing result: {:table-name {:column-name :result/column-name}} When using a function, provide a fn that takes 3 args: `conn`, `result` and `notification-data` and returns boolean indicating whether to refresh. `conn` is the database connection used to execute the query. `result` is the current result of the query. `notification-data` is a map with keys: - :operation - The operation that triggered the notification, e.g. :insert, :update, :delete - :table - The table that was changed, e.g. :users. If the schema is not public, it will be in the format :schema/table. For partitioned tables, this is always the parent table. - :row - The row that was changed, in the format: {:column-name value} The values are coerced to the type OID of the column. - :changes - A map of changed columns and their old and new values. The map is in the format: {:column-name [old-value new-value]} For :insert operations, old-value is nil. For :delete, new-value is nil. - :hashed - A set of columns that were hashed in the notification (because of payload size limits). You can also return :tracked-columns instead of a boolean to fall back to the default behavior for a specific table. - Any additional options are passed to pg2/execute
(unsub id)
Unsubscribe from a real-time query subscription & clean up resources. This does not remove any installed triggers.
Unsubscribe from a real-time query subscription & clean up resources. This does not remove any installed triggers.
cljdoc is a website building & hosting documentation for Clojure/Script libraries
× close