how to implement Remote procedure call (RPC) in RabbitMQ using Nodejs
Asked Answered
B

2

5

so i want to take a Json and parse it to Object and then implement a RPC RabbitMQ Server so that i can send the Object to the Server through RabbitMQ and there the Object will be proceed to be saved in a local Array and a universally unique id which will tell where exactly is the object stored, will be returned from that Server to the client throught the RPC.

the Official Webseite shows some Implementation to RPC in RabbitMQ, here you can find their Implementration https://www.rabbitmq.com/tutorials/tutorial-six-javascript.html , in the Tutorial they send a Number and the Server will calculate Fibonacci Sequence and return the Result to the Client. instead i want to send an Object, not a Number and i want to receive the universally unique id(uuid) of that Object which i ll store in a global Array in my Program, i changed the code so that it ll send an Object and return the uuid but it didnt work. i ll apreciate any help from you guys

    //this is my server_rpc.js code : 
   const amqp = require('amqplib/callback_api');
   const uuid = require("uuid/v1");

  amqp.connect('here is the url: example: localhost', (err, conn) => {

   conn.createChannel( (err, ch) => {

    let q = 'rpc_queue';

    ch.assertQueue(q, {durable: false});

    ch.prefetch(10);

    console.log(' [x] Waiting RPC requests');

    ch.consume(q, function reply(msg) {

        console.log("corralation key is: ", msg.properties.correlationId);
        let n = uuid();


        console.log(" data received ",JSON.parse(JSON.stringify(msg.content.toString())));

        console.log("corralation key is: ", msg.properties.correlationId);

        ch.sendToQueue(msg.properties.replyTo, Buffer.from(n.toString()), {correlationId: msg.properties.correlationId});

        ch.ack(msg);
    });
});

});

    // and this is my client_rpc.js code : 
    const amqp = require('amqplib/callback_api');
    const uuid = require("uuid/v1");
     const express = require("express");

let data = {
"name" : "hil01", 
"region" : "weissach",
"ID" : "1",
"version" : "0.0.1"
           } 



      amqp.connect('url: example localhost ', (err, conn) => {

        conn.createChannel( (err, ch) => {

         ch.assertQueue('', {exclusive: true}, (err, q) => {

        var corr = generateUuid();
        var newHil = JSON.stringify(data);

        console.log(" [x] Requesting uuid for the registered HIL: ", newHil );

        console.log("corralation key is: ", corr);

        ch.consume(q.queue, function(msg) {

            if(msg.properties.correlationId == corr) {

                console.log(" [.] Got %s", msg.content.toString());
                setTimeout(() => { conn.close(); process.exit(0) }, 100);
            }
        }, {noAck: true});

        ch.sendToQueue('rpc_queue', Buffer.from(newHil, {correlationId: corr, replyTo: q.queue }));
    });
});

});

    //method to generate the uuid, later will be replaced with the real 
   uuid function 
      var generateUuid = () => Math.random().toString() + 
      Math.random().toString() + Math.random().toString()  ;

when i run server_rpc, [x] waiting for requests should be printed then in a seperate cmd i run client_rpc.js then the object should be sent and the server execute and return to me the uuid back to the client.

Bolling answered 25/3, 2019 at 11:29 Comment(0)
B
7

Seems what you need here is direct reply-to RPC pattern: you want to send a message and get a response.

Here is documentation on RabbitMQ about direct reply-to: https://www.rabbitmq.com/direct-reply-to.html

TL;TR

Here is an example of a client and a server, which would work from the box:

https://github.com/Igor-lkm/node-rabbitmq-rpc-direct-reply-to

What is inside

After installation of RabbitMQ you would have 2 files to execute:

server.js

const amqp = require('amqplib');
const uuidv4 = require('uuid/v4');

const RABBITMQ = 'amqp://guest:guest@localhost:5672';

const open = require('amqplib').connect(RABBITMQ);
const q = 'example';

// Consumer
open
  .then(function(conn) {
    console.log(`[ ${new Date()} ] Server started`);
    return conn.createChannel();
  })
  .then(function(ch) {
    return ch.assertQueue(q).then(function(ok) {
      return ch.consume(q, function(msg) {
        console.log(
          `[ ${new Date()} ] Message received: ${JSON.stringify(
            JSON.parse(msg.content.toString('utf8')),
          )}`,
        );
        if (msg !== null) {
          const response = {
            uuid: uuidv4(),
          };

          console.log(
            `[ ${new Date()} ] Message sent: ${JSON.stringify(response)}`,
          );

          ch.sendToQueue(
            msg.properties.replyTo,
            Buffer.from(JSON.stringify(response)),
            {
              correlationId: msg.properties.correlationId,
            },
          );

          ch.ack(msg);
        }
      });
    });
  })
  .catch(console.warn);

client.js

const amqp = require('amqplib');
const EventEmitter = require('events');
const uuid = require('uuid');

const RABBITMQ = 'amqp://guest:guest@localhost:5672';

// pseudo-queue for direct reply-to
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
const q = 'example';

// Credits for Event Emitter goes to https://github.com/squaremo/amqp.node/issues/259

const createClient = rabbitmqconn =>
  amqp
    .connect(rabbitmqconn)
    .then(conn => conn.createChannel())
    .then(channel => {
      channel.responseEmitter = new EventEmitter();
      channel.responseEmitter.setMaxListeners(0);
      channel.consume(
        REPLY_QUEUE,
        msg => {
          channel.responseEmitter.emit(
            msg.properties.correlationId,
            msg.content.toString('utf8'),
          );
        },
        { noAck: true },
      );
      return channel;
    });

const sendRPCMessage = (channel, message, rpcQueue) =>
  new Promise(resolve => {
    const correlationId = uuid.v4();
    channel.responseEmitter.once(correlationId, resolve);
    channel.sendToQueue(rpcQueue, Buffer.from(message), {
      correlationId,
      replyTo: REPLY_QUEUE,
    });
  });

const init = async () => {
  const channel = await createClient(RABBITMQ);
  const message = { uuid: uuid.v4() };

  console.log(`[ ${new Date()} ] Message sent: ${JSON.stringify(message)}`);

  const respone = await sendRPCMessage(channel, JSON.stringify(message), q);

  console.log(`[ ${new Date()} ] Message received: ${respone}`);

  process.exit();
};

try {
  init();
} catch (e) {
  console.log(e);
}

You would have a result, like:

enter image description here

Bradleigh answered 25/3, 2019 at 18:32 Comment(0)
N
0

actually, this is not a answer of question. just for-your-information text

Changes

now 2022 amqplib API little bit changed, like below:

  • channel.responseEmitter.emit to channel.emit
  • channel.responseEmitter.once to channel.once
  • channel.responseEmitter.setMaxListeners to channel.setMaxListeners

more details see official api document


Alternatives

and if you looking for single-turn rpc (like request and response, grep response data directly).
I recommend use channel.get see detail in here

Nonchalant answered 10/11, 2022 at 2:35 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.