vert.x Wait for reply on multiple messages
Asked Answered
R

2

15

In vert.x I can send a message to another verticle and "wait asynchronously" for the reply.

The problem is: I want to send messages to multiple verticles and make an async handler to be called when all verticles replied.

Is this possible or is there a better design for achieving this functionality?

EDIT:

Supose I have a verticle A which sends messages to verticles B,C and D. Each verticle (B,C,D) do something with the message and return A some data. The verticle A then receives the response from B,C,D and does something with all the data. The problem is I have a handler for each message I send (one for A, one for B, one for C), I want one handler to be called when all the replies arrived.

Resistless answered 7/8, 2014 at 11:2 Comment(5)
Hi there If you want to "send" a message to multiple verticles you should have them subscribe to a specific topic and publish the message to these all then (see vertx.io/core_manual_java.html#publish-subscribe-messaging). For handling that all Verticles received them: What do you mean by all? Do you have the information how many there are? If not, of course every verticle could respond, but only those that are really there, depending on how many you launched, if something broke down etc.Godmother
Edited my question for clarification.Resistless
Ok, what do you mean by "one handler for each message"? One class? One instance? Multiple message type handlers? Do you want it to be a stateful handler? Should the handlers that receive the data and the one that should be notified be different? If you would like to have a normal message flow back & forth with messages & data but all to report to one central handler, then just implement it like that, either static because you know you want to have it or pass it as part of a message for example. Even with the additional information, I have some problems to fully get the picture. Sorry :(Godmother
It's like one verticle is the master and it sends orders to many slave verticles and waits until every slave does it's work. I solved the problem with a 'non vert.x' approach using a Java ExecutorService but left the question open for future reference.Resistless
Ok, but if we you go through your own management on top of the Vert.x thread handling, you are opening a potential issue location. But if it is a master and should wait for all slaves, why don't just make a Master Worker Verticle and send it accordingly and wait for the responses through state locking?Godmother
L
18

As of Vert.x 3.2, the documentation explains how to coordinate asyncronously using Futureand CompositeFuture.

So lets say you want to make two send calls over the event bus and do something when both are succeeded:

Future<Message> f1 = Future.future();
eventBus.send("first.address", "first message", f1.completer());

Future<Message> f2 = Future.future();
eventBus.send("second.address", "second message", f2.completer());

CompositeFuture.all(f1, f2).setHandler(result -> {
  // business as usual
});

Up to 6 futures can be passed as arguments or alternatively they can be passed as a list.

Leper answered 20/6, 2016 at 16:43 Comment(0)
O
3

The best approach for this is to use Reactive Extensions, as implemented by Netflix's Rx.Java and offered by the RxVertx Module.

The huge variety of operators allows you to do things like "zipping" the results of several asynchronous calls into a new result and do whatever you want with it.

I have a simple demo available on GitHub, which contains:

final Observable<JsonObject> meters = observeMetricsSource(metricsAddress, METERS_BUS_REQUEST, "meters", rx);
final Observable<JsonObject> histograms = observeMetricsSource(metricsAddress, HISTOGRAMS_BUS_REQUEST, "histograms", rx);
subscribeAndRespondJson(zip(meters, histograms, (jo1, jo2) -> jo1.mergeIn(jo2)), req);

This snippet shows how two observables coming from two event bus asynchronous interactions get "zipped" (ie merged) into one final HTTP response.

Odum answered 23/8, 2014 at 22:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.