Node.js rsmq - New message doesn't become visible until Node.js application is restarted
Asked Answered
C

1

5

I'm trying to make this package work.

redis version: stable 4.0.6

I connect Redis like this, there's no issues there.

pubsub.js

var redis = require("redis");
var psRedis = redis.createClient();

psRedis.auth("mypasswordishere", function (callback) {
  console.log("connected");
});

module.exports.psRedis = psRedis;

After starting Node.js application, I can see "connected" on the console and perform operations, I've checked this.

My test.js file is below.

test.js

var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;

var RedisSMQ = require("rsmq");

var psRedis = require("./realtime/operations/pubsub").psRedis;

var rsmq = new RedisSMQ({client: psRedis});

rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
  if (resp === 1) {
    console.log("queue created");
  }
});

rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
  if (resp) {
    console.log(resp);
  }
});

router.get('/pubsubTest', function (req, res, next) {

  async1.waterfall([
    function (callback) {

      rsmq.sendMessage({qname: "myqueue", message: "Hello World 1"}, function (err, resp) {
        if (resp) {
          console.log("Message sent. ID:", resp);
        }
      });

      callback(null, 'done!');

    }
  ], function (err, result) {
    res.sendStatus(200);
  });

});

module.exports = router;

However, when I visit /pubsubTest, only message id appears on the console.

Message sent. ID: exb289xu0i7IaQPEy1wA4O7xQQ6n0CAp

If I restart my Node.js application, I get to see the result below, which is expected. Why doesn't it appear immediately?

{ id: 'exb289xu0i7IaQPEy1wA4O7xQQ6n0CAp',
  message: 'Hello World 1',
  rc: 1,
  fr: 1515802884138,
  sent: 1515802880098 }

Thank you.

Cassock answered 12/1, 2018 at 23:39 Comment(0)
E
8

receiveMessage will not "fire". You need to call it after you have sent a message. what you are looking for is realtime option provided by rsmq.

var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});

Now on every new message that is being added to a queue via sendMessage, a PUBLISH message will be sent to rsmq:rt:{qname} with the content {msg}. In your case sendMessage will emit an event namely rsmq:rt:myqueue

There can be two solutions for this , both will use the event rsmq:rt:myqueue

1.First one will use a redis client , which can subscribe to this published event with subscribe method provided by redis to implement PUB/SUB.

 var redis = require('redis');
    const subscribe = redis.createClient();
    subscribe.subscribe('rsmq:rt:myqueue');
    subscribe.on('message', function(msg) {     //msg=>'rsmq:rt:myqueue'
        rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
            if (resp) {
                console.log(resp);
            }
        });
    });

The whole code will look something like:

var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});

rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
  if (resp === 1) {
    console.log("queue created");
  }
});

const subscribe = redis.createClient( 6379,"127.0.0.1"); //creating new 
worker and pass your credentials
subscribe.subscribe('rsmq:rt:myqueue');
subscribe.on('message', function(msg) {     //msg=>'rsmq:rt:myqueue'
    rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
        if (resp) {
            console.log(resp);
        }
    });
});

router.get('/pubsubTest', function (req, res, next) {
  async1.waterfall([
    function (callback) {
      rsmq.sendMessage({qname: "myqueue", message: "Hello World 1"}, 
function (err, 
      resp) {
        if (resp) {
          console.log("Message sent. ID:", resp);
        }});
      callback(null, 'done!');
    }
  ], function (err, result) {
    res.sendStatus(200);
  });
});
module.exports = router;

2.Second solution is to use rsmq-worker which will provide you with a message event which you can listen to using .on method.

var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1}); // this worker 
will poll the queue every .1 second.

worker.on( "message", function( message, next, msgid ){
     if(message){
         console.log(message);
     }
    next();
 });
worker.start();

The whole code will look something like:

var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis},{ ns: "rsmq",realtime :true});

rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
    if (resp === 1) {
        console.log("queue created");
    }
});

var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1});
worker.on( "message", function( message, next, msgid ){
        if(message){
            console.log(message);
        }
    next();
});


// optional error listeners
worker.on('error', function( err, msg ){
    console.log( "ERROR", err, msg.id );
});
worker.on('exceeded', function( msg ){
console.log( "EXCEEDED", msg.id );
});
worker.on('timeout', function( msg ){
    console.log( "TIMEOUT", msg.id, msg.rc );
});
worker.start();


router.get('/pubsubTest', function (req, res, next) {
    async1.waterfall([
        function (callback) {
            rsmq.sendMessage({qname: "myqueue", message: "Hello World1"}
            ,function (err, resp) {
                if (resp) {
                    console.log("Message sent. ID:", resp);
                }});
        callback(null, 'done!');
        }
    ], function (err, result) {
    res.sendStatus(200);
    });
});
module.exports = router;

Note: In first solution you will need to delete the message that you received from the queue using deleteMessage or you can also use popMessage which will receive the last message and delete it for you. if you do not delete the message you will get all of the messages again until the timeout is over on that particular message.

For this reason i prefer to use the second solution , rsmq takes care of this stuff for you and also you can provide your own poll interval

Extremity answered 14/2, 2019 at 3:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.