Postgres Tail (pg_tail)
I want to share a small utility for PostgreSQL that I use regularly for debugging. It is not a unique utility, there are many others like it, but this one is mine, ... and it's written in Rust!
Wouldn't it be great to have a tail
utility for PostgreSQL so you can see exactly what data was added to the database in real-time? Like I said, it's not unique, just google for it. I did.
PostgreSQL Notify🔗
PostgreSQL has a great feature called NOTIFY
. It has been around for a while and is used to get notifications when a change is made in the DB. They way I use this feature is the following:
- attach a
TRIGGER
to the a table, - that trigger organizes the data,
- call
pg_notify
with the data that is changed/inserted pg_notify
pushed that data to achannel
, I usually just use the channel name 'table_update'.
To make the aforementioned data useful, I like to get the data in JSON format, and fortunately PostgreSQL speaks JSON.
Setup Triggers🔗
Normally, I'm only interested in UPDATE
or INSERT
data, so I create triggers for UPDATE and INSERT to call a proceeded called table_update_notify
.
CREATE TRIGGER notify_ex_notify_update AFTER UPDATE ON notify_example FOR EACH ROW EXECUTE PROCEDURE table_update_notify();
CREATE TRIGGER notify_ex_notify_insert AFTER INSERT ON notify_example FOR EACH ROW EXECUTE PROCEDURE table_update_notify();
On to the stored procedure table_update_notify
. This matches on the type of trigger operation, if it's an INSERT
it simply converts the row into json, using the handy the built-in function row_to_json
, and wraps some other useful information into a larger json object using the json_build_object
function. If the operation is and UPDATE
it's slightly more complicated, I like to see a diff of the data. The problem here is that PostgreSQL doesn't have diff support for JSON objects natively, but we can write our own jsonb_subtract
operator (more specifically we can use this function from Mr Schinckel, see below.
-- Function to do nice diffs of JSONB objects (i.e. object_1 - object_2 )
CREATE OR REPLACE FUNCTION "jsonb_subtract"(
"json" jsonb,
"remove" jsonb
)
RETURNS jsonb LANGUAGE sql IMMUTABLE STRICT AS $function$
SELECT COALESCE(
(
SELECT ('{' || string_agg(to_json("key")::text || ':' || "value", ',') || '}')
FROM jsonb_each("json")
WHERE NOT
('{' || to_json("key")::text || ':' || "value" || '}')::jsonb <@ "remove"
-- Note: updated using code from http://8kb.co.uk/blog/2015/01/16/wanting-for-a-hstore-style-delete-operator-in-jsonb/
),
'{}'
)::jsonb
$function$;
-- Operator to do nice diffs of JSONB objects (i.e. object_1 - object_2 )
CREATE OPERATOR - (
LEFTARG = jsonb,
RIGHTARG = jsonb,
PROCEDURE = jsonb_subtract
);
CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
id bigint;
json_new jsonb;
json_old jsonb;
BEGIN
-- match on type of trigger operation, INSERT/UPDATE
IF TG_OP = 'INSERT' THEN
id = NEW.id;
PERFORM pg_notify('table_update', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(NEW) )::text);
-- trigger operation is UPDATE
ELSIF TG_OP = 'UPDATE' THEN
id = OLD.id;
json_new = row_to_json(NEW)::jsonb;
json_old = row_to_json(OLD)::jsonb;
PERFORM pg_notify('table_update',
json_build_object( 'table', TG_TABLE_NAME,
'id', id,
'type', TG_OP,
'row', row_to_json(NEW),
-- 'diff', json_build_object('before', json_old - json_new, 'after', json_new - json_old)
'old', json_old - json_new,
'new', json_new - json_old
)::text);
ELSE
id = OLD.id;
PERFORM pg_notify('table_update', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(NEW) )::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
The Rusty Side🔗
What does this get us? for now we just get JSON pushed out the table_update
channel. Now we need to write some code to listen on the channel. Luckily rust-postgres has support for this and its very straightforward. Just execute the LISTEN
postgres command, setup the notifier, and wait for new notifications to come through.
let listen_command = format!("LISTEN {}", listen_channel);
pg_conn.execute(listen_command.as_str(), &[]).expect("Could not send LISTEN");
let notifications = pg_conn.notifications();
// https://sfackler.github.io/rust-postgres/doc/v0.11.11/postgres/notification/struct.BlockingIter.html
let mut it = notifications.blocking_iter();
println!("Waiting for notifications... on:{} channel:{} ", db_conn_string, listen_channel);
loop {
match it.next() {
Ok(Some(notification)) => {
// https://github.com/sfackler/rust-postgres/blob/master/postgres-shared/src/lib.rs
let v: Value = serde_json::from_str(¬ification.payload).unwrap();
println!("Got update processes_id:{}, channel:{}", notification.process_id, notification.channel);
println!(" data {}", serde_json::to_string_pretty(&v).unwrap_or("Can't unwrap data".into()));
},
Err(err) => println!("Got err {:?}", err),
_ => panic!("Unexpected state.")
}
}
Its worth noting that this loop will block until a notification has arrived. I've eventually moved over from BlockingIter to TimeoutIter.
That is the guts of the program. I have the full source code in my example project which also has a small amount of code to handle command line options.
Conclusion🔗
The result is if you insert date or update data, you'll get notified. so if you run the following in PSQL,
[pg_tail] # insert into notify_example (name) values ('Yoda');
[pg_tail] # update notify_example set name='Yogurt' where name = 'Yoda';
You'll get the notification in the table_update
Waiting for notifications... on:postgres://postgres@localhost:5432/pg_tail channel:table_update
---
Got update processes_id:27200, channel:table_update
data {
"id": 1,
"row": {
"id": 1,
"name": "Yoda"
},
"table": "notify_example",
"type": "INSERT"
}
---
Got update processes_id:27200, channel:table_update
data {
"id": 1,
"new": {
"name": "Yogurt"
},
"old": {
"name": "Yoda"
},
"row": {
"id": 1,
"name": "Yogurt"
},
"table": "notify_example",
"type": "UPDATE"
}
For Extra Credit🔗
This approach can be used in conjunction with diesel to create ORM objects from JSON generated from the postgres notify channel.
References🔗
https://www.postgresql.org/docs/current/sql-notify.html
https://www.postgresql.org/docs/current/sql-listen.html
https://pgdash.io/blog/postgres-features.html
http://8kb.co.uk/blog/2015/01/16/wanting-for-a-hstore-style-delete-operator-in-jsonb/
https://github.com/FGRibreau/postgresql-to-amqp/blob/master/src/main.rs
https://schinckel.net/2014/09/29/adding-json(b)-operators-to-postgresql/