Postgres Tail (pg_tail)

Bradley Noyes published on
6 min, 1012 words

I want to share a small utility for PostgreSQL that I use regularly for debugging. It is not a unique utility, there are many others like it, but this one is mine, ... and it's written in Rust!

Wouldn't it be great to have a tail utility for PostgreSQL so you can see exactly what data was added to the database in real-time? Like I said, it's not unique, just google for it. I did.

PostgreSQL Notify🔗

PostgreSQL has a great feature called NOTIFY. It has been around for a while and is used to get notifications when a change is made in the DB. They way I use this feature is the following:

  • attach a TRIGGER to the a table,
  • that trigger organizes the data,
  • call pg_notify with the data that is changed/inserted
  • pg_notify pushed that data to a channel, I usually just use the channel name 'table_update'.

To make the aforementioned data useful, I like to get the data in JSON format, and fortunately PostgreSQL speaks JSON.

Setup Triggers🔗

Normally, I'm only interested in UPDATE or INSERT data, so I create triggers for UPDATE and INSERT to call a proceeded called table_update_notify.

    CREATE TRIGGER notify_ex_notify_update AFTER UPDATE ON notify_example FOR EACH ROW EXECUTE PROCEDURE table_update_notify();
    CREATE TRIGGER notify_ex_notify_insert AFTER INSERT ON notify_example FOR EACH ROW EXECUTE PROCEDURE table_update_notify();

On to the stored procedure table_update_notify. This matches on the type of trigger operation, if it's an INSERT it simply converts the row into json, using the handy the built-in function row_to_json, and wraps some other useful information into a larger json object using the json_build_object function. If the operation is and UPDATE it's slightly more complicated, I like to see a diff of the data. The problem here is that PostgreSQL doesn't have diff support for JSON objects natively, but we can write our own jsonb_subtract operator (more specifically we can use this function from Mr Schinckel, see below.

    -- Function to do nice diffs of JSONB objects (i.e. object_1 - object_2 )
    CREATE OR REPLACE FUNCTION "jsonb_subtract"(
      "json" jsonb,
      "remove" jsonb
    )
      RETURNS jsonb LANGUAGE sql IMMUTABLE STRICT AS $function$
       SELECT COALESCE(
       (
           SELECT ('{' || string_agg(to_json("key")::text || ':' || "value", ',') || '}')
           FROM jsonb_each("json")
           WHERE NOT
               ('{' || to_json("key")::text || ':' || "value" || '}')::jsonb <@ "remove"
               -- Note: updated using code from http://8kb.co.uk/blog/2015/01/16/wanting-for-a-hstore-style-delete-operator-in-jsonb/
       ),
      '{}'
       )::jsonb
    $function$;
    
    -- Operator to do nice diffs of JSONB objects (i.e. object_1 - object_2 )
    CREATE OPERATOR - (
      LEFTARG = jsonb,
      RIGHTARG = jsonb,
      PROCEDURE = jsonb_subtract
    );
    
    
    CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
    DECLARE
      id bigint;
      json_new jsonb;
      json_old jsonb;
    BEGIN
      -- match on type of trigger operation, INSERT/UPDATE
      IF TG_OP = 'INSERT' THEN
       id = NEW.id;
       PERFORM pg_notify('table_update', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(NEW) )::text);
      -- trigger operation is UPDATE
      ELSIF TG_OP = 'UPDATE' THEN
       id = OLD.id;
       json_new = row_to_json(NEW)::jsonb;
       json_old = row_to_json(OLD)::jsonb;
       PERFORM pg_notify('table_update',
                   json_build_object(  'table', TG_TABLE_NAME,
                                       'id', id,
                                       'type', TG_OP,
                                       'row', row_to_json(NEW),
                                       -- 'diff',  json_build_object('before', json_old - json_new, 'after', json_new - json_old)
                                       'old', json_old - json_new,
                                       'new', json_new - json_old
                                   )::text);
      ELSE
       id = OLD.id;
       PERFORM pg_notify('table_update', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(NEW) )::text);
     END IF;
     RETURN NEW;
    END;
    $$ LANGUAGE plpgsql;

The Rusty Side🔗

What does this get us? for now we just get JSON pushed out the table_update channel. Now we need to write some code to listen on the channel. Luckily rust-postgres has support for this and its very straightforward. Just execute the LISTEN postgres command, setup the notifier, and wait for new notifications to come through.

    let listen_command = format!("LISTEN {}", listen_channel);
    pg_conn.execute(listen_command.as_str(), &[]).expect("Could not send LISTEN");

    let notifications = pg_conn.notifications();

    // https://sfackler.github.io/rust-postgres/doc/v0.11.11/postgres/notification/struct.BlockingIter.html
    let mut it = notifications.blocking_iter();
        println!("Waiting for notifications... on:{}  channel:{} ", db_conn_string, listen_channel);
        
    loop {
        match it.next() {
            Ok(Some(notification)) => {
                // https://github.com/sfackler/rust-postgres/blob/master/postgres-shared/src/lib.rs
                let v: Value = serde_json::from_str(&notification.payload).unwrap();
                println!("Got update processes_id:{}, channel:{}", notification.process_id, notification.channel);
                println!("  data {}", serde_json::to_string_pretty(&v).unwrap_or("Can't unwrap data".into()));
                            },
            Err(err) => println!("Got err {:?}", err),
            _ => panic!("Unexpected state.")
        }
    }

Its worth noting that this loop will block until a notification has arrived. I've eventually moved over from BlockingIter to TimeoutIter.

That is the guts of the program. I have the full source code in my example project which also has a small amount of code to handle command line options.

Conclusion🔗

The result is if you insert date or update data, you'll get notified. so if you run the following in PSQL,

    [pg_tail] # insert into notify_example (name) values ('Yoda');
    [pg_tail] # update notify_example set name='Yogurt' where name = 'Yoda';

You'll get the notification in the table_update

    Waiting for notifications... on:postgres://postgres@localhost:5432/pg_tail  channel:table_update
    ---
    Got update processes_id:27200, channel:table_update
      data {
      "id": 1,
      "row": {
        "id": 1,
        "name": "Yoda"
      },
      "table": "notify_example",
      "type": "INSERT"
    }
    ---
    Got update processes_id:27200, channel:table_update
      data {
      "id": 1,
      "new": {
        "name": "Yogurt"
      },
      "old": {
        "name": "Yoda"
      },
      "row": {
        "id": 1,
        "name": "Yogurt"
      },
      "table": "notify_example",
      "type": "UPDATE"
    }

For Extra Credit🔗

This approach can be used in conjunction with diesel to create ORM objects from JSON generated from the postgres notify channel.

References🔗

https://www.postgresql.org/docs/current/sql-notify.html

https://www.postgresql.org/docs/current/sql-listen.html

https://pgdash.io/blog/postgres-features.html

http://8kb.co.uk/blog/2015/01/16/wanting-for-a-hstore-style-delete-operator-in-jsonb/

https://github.com/FGRibreau/postgresql-to-amqp/blob/master/src/main.rs

https://schinckel.net/2014/09/29/adding-json(b)-operators-to-postgresql/

https://github.com/sfackler/rust-postgres