Listening to multiple RabbitMQ queues with Nestjs
Asked Answered
M

4

7

I was looking at Nestjs documentation to set up a microservice that listens to RabbitMQ messages. It's very straight forward when I have to listen to one queue. What if there are multiple queues that my microservice has to listen to? I was using the following method which is done in main.ts file.

await app.connectMicroservice({
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://localhost:5672'],
      queue: 'q-1',
      queueOptions: {
        durable: false
      },
    },
  });

Now that I have more than one queue, I can call another connectMicroservice function to do so. However, when consuming messages in my Controller, there's no way to tell my controller to which queue to listen to (either q-1 or q-2). All I know is that there's a @MessagePattern decorator that can mention what pattern to consume in that function but not sure how to mention the queue name.

Mcmath answered 17/7, 2020 at 8:3 Comment(0)
K
17

The built in NestJS microservices implementation for RabbitMQ is a bit limited when it comes to these types of scenarios.

The @golevelup/nestjs-rabbitmq package was built specifically to address these gaps in functionality. It gives you a better integration that allows you to intuitively interact with multiple RabbitMQ Exchanges and Queues inside of a single NestJS application or microservice. It also aims to provide better support for different messaging patterns like Publish/Subscribe and RPC.

Disclaimer: I'm the author of this package

Kohl answered 17/7, 2020 at 20:21 Comment(2)
I have used it earlier,but then it doesn't support manual acknowledgment which is causing race conditions as in my MongoDB there are asynchronous calls,which alter the table ,and therefore causing Version Control errors, Please don't use itButyl
@KritarthSharma Learn more about this issue on this GH issue.Redeploy
E
4

Simply add code as follows:

await app.connectMicroservice({
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://localhost:5672'],
      queue: 'q-1',
      queueOptions: {
        durable: false
      },
    },
  });
await app.connectMicroservice({
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://localhost:5672'],
      queue: 'q-2',
      queueOptions: {
        durable: false
      },
    },
  });

or an even simpler solution:

for (const queue of ['q-1', 'q-2']) {
  await app.connectMicroservice({
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://localhost:5672'],
      queue,
      queueOptions: {
        durable: false
      },
    },
  });
}
app.startAllMicroservices();
Eponym answered 7/12, 2021 at 12:8 Comment(1)
This does not answer the question at all, They're asking what they should do in their controller: "when consuming messages in my Controller, there's no way to tell my controller to which queue to listen to (either q-1 or q-2).". And they've said it clearly that they know that they can call connectMicroservice several times :/.Redeploy
N
1

Preface: I'm explaining this in the context of using NestJS

I understand what the intent is that you are after: wanting a microservice to listen to events emitted from other microservices. I am also using NestJS and here is my approach and how I solved that problem.

First thing to understand is that each nest microservice instance should only listen to one queue; this is by design. Let's look at why.

Imagine you have multiple nestjs app instances connected to the same message queue. Each app instance tells the message broker (Kafka, RabbitMQ, etc) that they are listening to events/messages on a specific queue, let's call it QUEUE_A. What this means for the message broker is that each of these app instances can/should be able to handle messages that comes into QUEUE_A. Now, when it is time for the message broker to dispatch a message in QUEUE_A, the message broker is going to pick any one of those app instances. What if it picks a app instance that didn't implement a handler (what you define in your controller(s) with the @MessagePattern decorator)? An error happens.

When you call app.connectMicroservice, it registers as a listener for events and messages, but you only want events from a queue, not messages.

Here is my approach: emit the same event with the same data, but to all other message/event queues.

For example, I have an api gateway and a users, auth and notifications microservice. The api gateway sends messages to the appropriate microservice and that microservice emits an event with the results.

Technically, that event could be sent to all other queues (like a copy); the only difference in the event and the data is the queue that is in. That way, each microservice can handle that same event and data in their own queue.

First, i created a broadcast service and module in a common library:

// service
@Injectable()
export class BroadcastService {
  // generic alternative to discovery mechanism
  microservice_clients: { [key:string]: ClientProxy } = {};
  
  constructor() {
    for (const microservice of available_microservices_list) {
      const client: ClientProxy = ClientProxyFactory.create(RmqService.getOptions(microservice));
      this.microservice_clients[microservice] = client;
    }
  }

  async broadcastEventToAllMicroservices(
    event: string,
    data: any,
    excluding?: MicroserviceNames[]
  ) {
    console.log(`broadcasting event...`, { event, data, excluding });
    for (const microservice of available_microservices_list) {
      const ignoreMicroservice = excluding && excluding.includes(microservice as MicroserviceNames);
      if (ignoreMicroservice) {
        continue;
      }
      const client = this.microservice_clients[microservice];
      client.emit(event, data || {});
    }
  }

  async broadcastEventToSelectMicroservices(params: {
    event: string,
    data: any,
    microserviceNamesList: MicroserviceNames[]
  }) {
    console.log(`broadcasting event...`, params);
    for (const microservice of params.microserviceNamesList) {
      if (!available_microservices_set.has(microservice)) {
        console.log(`microservice is not available for handling: ${microservice}; continuing...`);
        continue;
      }

      const client: ClientProxy = this.microservice_clients[microservice];
      if (!client) {
        console.log(`no client found by: ${microservice}; continuing...`);
        continue;
      }
      client.emit(params.event, params.data || {});
    }
  }
}

// module
@Module({
  imports: [
  ],
  providers: [BroadcastService],
  exports: [BroadcastService],
})
export class BroadcastModule {}

The broadcast service basically takes an event name and data, loops through a list of queue names and for each, create a client to connect to that queue on the message broker and emits the given event and data to that queue. It also accepts an optional list of queues to ignore. Import that broadcast module into every other nestjs app/microservice instance where you want to the new broadcast service.

Now, when i get a message to create a user in the users-microservice app instance, the service class can inject that broadcast service and broadcast the user creation event to all other queues:

// users-microservice controller
@Controller()
export class UsersMicroserviceController {
  @MessagePattern(UsersMicroserviceMessages.CREATE_USER)
  async createUser(@Payload() data: CreateUserDto, @Ctx() context: RmqContext): ServiceMethodAsyncResults {
    console.log(`UsersMicroserviceController.createUser:`, { data, context });
    const new_user = await this.usersMicroserviceService.createUser(data, context);
    console.log(`new user created`, new_user);
    return new_user;
  }
}

// service 
@Injectable()
export class UsersMicroserviceService {
  constructor (
    private usersRepository: UsersRepository,
    private rmqService: RmqService,
    private broadcastService: BroadcastService,
  ) {}

  async createUser(data: CreateUserDto, context: RmqContext): Promise<ServiceMethodResults> {
    const user = await this.usersRepository.create({ ...data, password: bcrypt.hashSync(data.password) });
    this.rmqService.ack(context);
    this.broadcastService.broadcastEventToAllMicroservices(UsersMicroserviceEvents.USER_CREATED, { user }, [MicroserviceNames.USERS]);

    return user;
  }
}

Now i'm not saying this is a recommended/optimal approach BUT it is working fine for me. Whenever a user gets created, other microservices get a copy of that event sent to their own queue where they can save that same user data in its own internal database.

Nuisance answered 22/11, 2022 at 4:26 Comment(0)
S
0

If I have understood correctly, nestjs/mircoservices always requires a direct connection to the respective services. To clarify, it would always be legitimate to create a client for each service. At least I think this makes a lot of sense for request-response.

But now to event-based: In my opinion, this is a bit limited, as different transport platforms (TCP,...,RabbitMq) are to be covered and the full possibilities cannot be used. The aim of a message broker is to be independent of the producer and consumer, and unfortunately this is lost here. Especially since different consumers within a service also listen to the event and if an event cannot be processed, it becomes very difficult to repeat the event only for this consumer.

I have therefore decided to write a small module accordingly, which should fulfill the general requirement for events.

https://github.com/cubiless/nestjs-message-broker

Maybe that would be a solution or maybe there are suggestions for improvement.

Silverware answered 28/8, 2024 at 13:16 Comment(1)
Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.Bullheaded

© 2022 - 2025 — McMap. All rights reserved.