How to correctly listen for a postgresql notification from node.js
Asked Answered
S

3

9

Objective:
When a new record is inserted into a specific PostgreSQL table, I would like PostgreSQL to notify my node.js web application so that it can initiate an API call to an external service.

I understand the basic steps are:

  1. Establish a PostgreSQL trigger function which will perform pg_notify() method.
  2. Establish a PostgreSQL trigger which will execute the trigger function after table insert.
  3. Establish a mechanism in node.js to listen to channel-specific PostgreSQL notification.

Here is my attempt at each step:

  1. Trigger function in notify_app_after_table_insert.pgsql

    CREATE OR REPLACE FUNCTION notify_app_after_table_insert()
    RETURNS TRIGGER AS
    $BODY$
        BEGIN
            PERFORM pg_notify('channel', row_to_json(NEW)::text);
            RETURN new;
        END;
    $BODY$
    LANGUAGE plpgsql
    
  2. Trigger in trigger_notify_app_after_table_insert.sql

    CREATE TRIGGER trigger_notify_app_after_table_insert
    AFTER INSERT
    ON table
    FOR EACH ROW
    EXECUTE PROCEDURE notify_app_after_table_insert();
    
  3. Listener mechanism in index.js (inside my web app's backend)

    //tools
    const express = require('express');
    const app = express();
    const cors = require('cors');
    const bodyParser = require('body-parser');
    const port = 3001;
    const pool = require('./db'); //stores my postgresql credentials
    
    // Middleware
    app.use(cors())
    app.use(bodyParser.json())
    app.use(bodyParser.urlencoded({extended: true}))
    
    // Apply app.listen notification to console.log
    app.listen(port, () => {
        console.log(`App running on port ${port}.`)
    })
    
    // Apply channel-specific listener mechanism
    pool.connect(function(err, client, done) {
        if(err) {
            console.log(err);
        }
        client.on('notification', function(msg) {
            console.log(msg);
        })
        client.query("LISTEN channel");
        done();
    });
    

Problem:
When the backend web-app server is running and a new record is inserted in the db table, I expect to see a notification message in my web-app's terminal, but nothing appears. I suspect the problem is in the last code block of index.js, but haven't been able to isolate it.

Any suggestions on how to correctly receive the notification in index.js? Thanks in advance.

Scrunch answered 11/8, 2022 at 15:59 Comment(2)
Any luck michael?Padang
done(); closes the connection. Drop it.Eduardo
E
2

I had the same problem and I decided to use pg-listen (https://github.com/andywer/pg-listen). Here's my implementation:

PG:

CREATE TABLE active.events(
  uid UUID DEFAULT gen_random_uuid(),
  created_ts TIMESTAMP DEFAULT NOW(),
  consumed_ts TIMESTAMP NULL,
  origin VARCHAR(200) NOT NULL,
  channel VARCHAR(200) NOT NULL,
  type VARCHAR(50) NOT NULL,
  path VARCHAR(200) NOT NULL,
  payload JSONB NOT NULL,
  result JSONB,
  CONSTRAINT events_pkey PRIMARY KEY(uid),
  CONSTRAINT events_ukey UNIQUE(uid)
);
CREATE INDEX ON active.events(uid);
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE active.events TO _pg_mb_rl;
ALTER TABLE active.events OWNER TO _pg_mb_rl;

-- TRIGGER
CREATE OR REPLACE FUNCTION active.tg_notify_events()
 RETURNS TRIGGER
 LANGUAGE PLPGSQL
AS $tg_notify_events$
DECLARE
    --channel TEXT := TG_ARGV[0];
BEGIN
    PERFORM pg_notify(NEW.channel, row_to_json(NEW)::TEXT);
    UPDATE active.events SET consumed_ts = NOW() WHERE uid = NEW.uid;
  RETURN NULL;
END;
$tg_notify_events$;

CREATE OR REPLACE TRIGGER notify_events
    AFTER INSERT ON active.events
    FOR EACH ROW EXECUTE PROCEDURE active.tg_notify_events();

NODEJS:

const createSubscriber = require('pg-listen');

const channel = 'message_queue';
const subscriber = createSubscriber({ connectionString: process.env.DATABASE_URL });
subscriber.notifications.on(channel, (payload) => {
  console.log('Received notification in ' + channel, payload);
});

subscriber.events.on('error', (error) => {
  console.error('Fatal database connection error:', error)
  process.exit(1)
});

process.on('exit', () => {
  subscriber.close()
});

await subscriber.connect();
await subscriber.listenTo(channel);

Hope it helps!

Ethelethelbert answered 16/6, 2023 at 6:36 Comment(0)
S
1

I think this is because of order. Write like this:

client.query("LISTEN channel");
client.on('notification', function(msg) {
  console.log(msg);
})

For me querying for LISTEN first worked.

Syringa answered 14/6, 2023 at 11:18 Comment(4)
...not for me though, maybe there is more to it than just that.Overplus
Without testing it makes more sense to first subscribe to the notifications with LISTEN and after that register the action when a notification happens.Karie
After testing I have to say I have same problem that OP and this solution doesn't solve it.Karie
What solved it for me strangely was writing client.query('LISTEN "channel"') instead of client.query('LISTEN channel') (wrapping the channel name in double quotes)Cheremkhovo
P
1

I've done no tests, but since Goga Okradze states it works I have no reason to doubt... (apart from order of the calls, I guess the order of the calls are not relevant). Unfortunately the answer is really poor of details and I understand why it is hard to reproduce it.

The problem in OP code seems to be just the last line of code: done();: it closes the connection, so it also stops listening to events.

I bet that simply removing that call the POC will start working.

An unexpert reader could think: really never close the connection?

Sure! As long as we are interested in receiving events, the connection must be kept open.

Probably we could improve the POC adding a reconnection feature.

const addListener = () => pool.connect(function(err, client, done) {
  if(err) {
    console.log(err);

    // in case of error while connecting (DB down?), retry after 1"
    return setTimeout(addListener, 1000).unref();
  }

  // in case of error, close the client as well
  client.on('error', done);

  // when client is closed, open a new one
  client.on('end', addListener);
  // this should be improved to handle a correct server shutdown
  // in case of server shutdown,
  // probably we want to close the client without opening a new one

  client.on('notification', function(msg) {
    console.log(msg);

    // perform here actual message handling
  })

  client.query("LISTEN channel");
});

addListener();
Peripeteia answered 1/5 at 17:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.