Preface: I'm explaining this in the context of using NestJS
I understand what the intent is that you are after: wanting a microservice to listen to events emitted from other microservices. I am also using NestJS and here is my approach and how I solved that problem.
First thing to understand is that each nest microservice instance should only listen to one queue; this is by design. Let's look at why.
Imagine you have multiple nestjs app instances connected to the same message queue. Each app instance tells the message broker (Kafka, RabbitMQ, etc) that they are listening to events/messages on a specific queue, let's call it QUEUE_A. What this means for the message broker is that each of these app instances can/should be able to handle messages that comes into QUEUE_A. Now, when it is time for the message broker to dispatch a message in QUEUE_A, the message broker is going to pick any one of those app instances. What if it picks a app instance that didn't implement a handler (what you define in your controller(s) with the @MessagePattern decorator)? An error happens.
When you call app.connectMicroservice
, it registers as a listener for events and messages, but you only want events from a queue, not messages.
Here is my approach: emit the same event with the same data, but to all other message/event queues.
For example, I have an api gateway and a users, auth and notifications microservice. The api gateway sends messages to the appropriate microservice and that microservice emits an event with the results.
Technically, that event could be sent to all other queues (like a copy); the only difference in the event and the data is the queue that is in. That way, each microservice can handle that same event and data in their own queue.
First, i created a broadcast service and module in a common library:
// service
@Injectable()
export class BroadcastService {
// generic alternative to discovery mechanism
microservice_clients: { [key:string]: ClientProxy } = {};
constructor() {
for (const microservice of available_microservices_list) {
const client: ClientProxy = ClientProxyFactory.create(RmqService.getOptions(microservice));
this.microservice_clients[microservice] = client;
}
}
async broadcastEventToAllMicroservices(
event: string,
data: any,
excluding?: MicroserviceNames[]
) {
console.log(`broadcasting event...`, { event, data, excluding });
for (const microservice of available_microservices_list) {
const ignoreMicroservice = excluding && excluding.includes(microservice as MicroserviceNames);
if (ignoreMicroservice) {
continue;
}
const client = this.microservice_clients[microservice];
client.emit(event, data || {});
}
}
async broadcastEventToSelectMicroservices(params: {
event: string,
data: any,
microserviceNamesList: MicroserviceNames[]
}) {
console.log(`broadcasting event...`, params);
for (const microservice of params.microserviceNamesList) {
if (!available_microservices_set.has(microservice)) {
console.log(`microservice is not available for handling: ${microservice}; continuing...`);
continue;
}
const client: ClientProxy = this.microservice_clients[microservice];
if (!client) {
console.log(`no client found by: ${microservice}; continuing...`);
continue;
}
client.emit(params.event, params.data || {});
}
}
}
// module
@Module({
imports: [
],
providers: [BroadcastService],
exports: [BroadcastService],
})
export class BroadcastModule {}
The broadcast service basically takes an event name and data, loops through a list of queue names and for each, create a client to connect to that queue on the message broker and emits the given event and data to that queue. It also accepts an optional list of queues to ignore. Import that broadcast module into every other nestjs app/microservice instance where you want to the new broadcast service.
Now, when i get a message to create a user in the users-microservice app instance, the service class can inject that broadcast service and broadcast the user creation event to all other queues:
// users-microservice controller
@Controller()
export class UsersMicroserviceController {
@MessagePattern(UsersMicroserviceMessages.CREATE_USER)
async createUser(@Payload() data: CreateUserDto, @Ctx() context: RmqContext): ServiceMethodAsyncResults {
console.log(`UsersMicroserviceController.createUser:`, { data, context });
const new_user = await this.usersMicroserviceService.createUser(data, context);
console.log(`new user created`, new_user);
return new_user;
}
}
// service
@Injectable()
export class UsersMicroserviceService {
constructor (
private usersRepository: UsersRepository,
private rmqService: RmqService,
private broadcastService: BroadcastService,
) {}
async createUser(data: CreateUserDto, context: RmqContext): Promise<ServiceMethodResults> {
const user = await this.usersRepository.create({ ...data, password: bcrypt.hashSync(data.password) });
this.rmqService.ack(context);
this.broadcastService.broadcastEventToAllMicroservices(UsersMicroserviceEvents.USER_CREATED, { user }, [MicroserviceNames.USERS]);
return user;
}
}
Now i'm not saying this is a recommended/optimal approach BUT it is working fine for me. Whenever a user gets created, other microservices get a copy of that event sent to their own queue where they can save that same user data in its own internal database.