receiveMessage
will not "fire". You need to call it after you have sent a message.
what you are looking for is realtime option provided by rsmq.
var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});
Now on every new message that is being added to a queue via sendMessage
, a PUBLISH message will be sent to rsmq:rt:{qname}
with the content {msg}
. In your case sendMessage
will emit an event namely rsmq:rt:myqueue
There can be two solutions for this , both will use the event rsmq:rt:myqueue
1.First one will use a redis client , which can subscribe to this published event with subscribe
method provided by redis to implement PUB/SUB.
var redis = require('redis');
const subscribe = redis.createClient();
subscribe.subscribe('rsmq:rt:myqueue');
subscribe.on('message', function(msg) { //msg=>'rsmq:rt:myqueue'
rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
if (resp) {
console.log(resp);
}
});
});
The whole code will look something like:
var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis}, ns: "rsmq",realtime :true});
rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
if (resp === 1) {
console.log("queue created");
}
});
const subscribe = redis.createClient( 6379,"127.0.0.1"); //creating new
worker and pass your credentials
subscribe.subscribe('rsmq:rt:myqueue');
subscribe.on('message', function(msg) { //msg=>'rsmq:rt:myqueue'
rsmq.receiveMessage({qname: "myqueue"}, function (err, resp) {
if (resp) {
console.log(resp);
}
});
});
router.get('/pubsubTest', function (req, res, next) {
async1.waterfall([
function (callback) {
rsmq.sendMessage({qname: "myqueue", message: "Hello World 1"},
function (err,
resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}});
callback(null, 'done!');
}
], function (err, result) {
res.sendStatus(200);
});
});
module.exports = router;
2.Second solution is to use rsmq-worker
which will provide you with a message event which you can listen to using .on
method.
var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1}); // this worker
will poll the queue every .1 second.
worker.on( "message", function( message, next, msgid ){
if(message){
console.log(message);
}
next();
});
worker.start();
The whole code will look something like:
var express = require('express');
var router = express.Router();
var path = require("path");
var bodyParser = require('body-parser');
var async1 = require("async");
var client = require("../databases/redis/redis.js").client;
var psRedis = require("./realtime/operations/pubsub").psRedis;
var rsmq = new RedisSMQ({client: psRedis},{ ns: "rsmq",realtime :true});
rsmq.createQueue({qname: "myqueue"}, function (err, resp) {
if (resp === 1) {
console.log("queue created");
}
});
var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" ,{interval:.1});
worker.on( "message", function( message, next, msgid ){
if(message){
console.log(message);
}
next();
});
// optional error listeners
worker.on('error', function( err, msg ){
console.log( "ERROR", err, msg.id );
});
worker.on('exceeded', function( msg ){
console.log( "EXCEEDED", msg.id );
});
worker.on('timeout', function( msg ){
console.log( "TIMEOUT", msg.id, msg.rc );
});
worker.start();
router.get('/pubsubTest', function (req, res, next) {
async1.waterfall([
function (callback) {
rsmq.sendMessage({qname: "myqueue", message: "Hello World1"}
,function (err, resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}});
callback(null, 'done!');
}
], function (err, result) {
res.sendStatus(200);
});
});
module.exports = router;
Note: In first solution you will need to delete the message that you received from the queue using deleteMessage
or you can also use popMessage
which will receive the last message and delete it for you. if you do not delete the message you will get all of the messages again until the timeout is over on that particular message.
For this reason i prefer to use the second solution , rsmq takes care of this stuff for you and also you can provide your own poll interval