ZeroMQ, can we use inproc: transport along with pub/sub messaging pattern
Asked Answered
P

1

6

Scenario :

We were evaluating ZeroMQ (specifically jeroMq) for an event driven mechanism.

The application is distributed where multiple services (both publishers and subscribers are services) can exist either in the same jvm or in distinct nodes, which depends on the deployment architecture.

Observation

For playing around I created a pub/sub pattern with inproc: as the transport , using jero mq (version :0.3.5)

  1. The thread publishing is able to publish (looks like getting published, at least no errors)
  2. The subscriber which is in another thread is not receiving anything.

Question

Is using inproc: along with pub/sub feasible?

Tried googling but couldn't find anything specific, any insights?

Code sample for pub/sub with inproc:

The working code sample for inproc pub sub using jero mq (version :0.3.5), would be useful for someone later visiting this post. One publisher publishing topics A and B, and two subscribers receiving A and B separately

/**
 * @param args
 */
public static void main(String[] args) {

    // The single ZMQ instance
    final Context context = ZMQ.context(1);

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    //Publisher
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startPublishing(context);
        }
    });
    //Subscriber for topic "A"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startFirstSubscriber(context);
        }
    });
    // Subscriber for topic "B"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startSecondSubscriber(context);
        }
    });

}

/**
 * Prepare the publisher and publish
 * 
 * @param context
 */
private static void startPublishing(Context context) {

    Socket publisher = context.socket(ZMQ.PUB);
    publisher.bind("inproc://test");
    while (!Thread.currentThread().isInterrupted()) {
        // Write two messages, each with an envelope and content
        try {
            publisher.sendMore("A");
            publisher.send("We don't want to see this");
            LockSupport.parkNanos(1000);
            publisher.sendMore("B");
            publisher.send("We would like to see this");
        } catch (Throwable e) {

            e.printStackTrace();
        }
    }
    publisher.close();
    context.term();
}

/**
 * Prepare and receive through the subscriber
 * 
 * @param context
 */
private static void startFirstSubscriber(Context context) {

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");

    subscriber.subscribe("B".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber1 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}

/**
 * Prepare and receive though the subscriber
 * 
 * @param context
 */
private static void startSecondSubscriber(Context context) {
    // Prepare our context and subscriber

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");
    subscriber.subscribe("A".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber2 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}
Peppergrass answered 17/2, 2016 at 10:18 Comment(1)
Added the sample code of the scenario for future referencePeppergrass
P
6

The ZMQ inproc transport is intended for use within a single process, between different threads. When you say "can exist either in the same jvm or in distinct nodes" (emphasis mine) I assume you mean that you're spinning up multiple processes as distributed services rather than multiple threads within a single process.

If that's the case, then no, what you're trying to do won't work with inproc. PUB-SUB/inproc would work fine within a single process between multiple threads.


Edit to address further questions in the comments:

The reason to use a transport like inproc or ipc is because it's a little more efficient (faster) than the tcp transport when you're in the right context to use them. You could conceivably use a mixture of transports, but you'll always have to bind and connect on the same transport to make it work.

This means that each node would need up to three PUB or SUB sockets - a tcp publisher to talk to nodes on remote hosts, an ipc publisher to talk to nodes on different processes on the same host, and an inproc publisher to talk to nodes in different threads in the same process.

Practically, in most cases you'd just use the tcp transport and only spin up one socket for everything - tcp works everywhere. It could make sense to spin up multiple sockets if each socket is responsible for a particular kind of information.

If there's a reason that you'll always be sending one message type to other threads and a different message type to other hosts, then multiple sockets makes sense, but in your case it sounds like, from the perspective of one node, all other nodes are equal. In that case I would use tcp everywhere and be done with it.

Perceivable answered 17/2, 2016 at 15:21 Comment(9)
Thank you @jason, It should be the problem with my sample code , I would look into it and put it here.Peppergrass
Also, in our case we have subscribers/publishers which are in process and in other process. Any hints on how to go forward in this scenario.Peppergrass
If you're going between potentially multiple different processes, but all on the same logical server instance, then the ipc transport is your best choice. If you're going to be communicating with other computers over the network, use tcp.Perceivable
Got it, inproc for between threads, ipc for between local processes and tcp for between remote processes. In our scenario there can be publishers/ subscribers who want to bind to a topic, say nasdaq event. these publishers/subscribers can be in the same process or in multiple processes(which can be both in remote and same server instance). Can we potentially publish/subscribe to a topic from different transports ?Peppergrass
Thanks you @Perceivable , I am getting a good picture, would update here with the sample.Peppergrass
This is so confusing. Replacing inproc with tcp instantly fixed my pubsub within a single process issue. I was thinking that the only socket that should work with inproc/multithreading is PAIR.Colonnade
Nope, the more general connection types are always functional in more specific circumstances, but the reverse is not always true. So per the docs, PAIR sockets are designed for inproc/inter-thread communication, which means they are not considered suitable for more general purposes (but will still function - just expect weird issues arising from expected network instability), but more general socket types and transports are still functional in multi-threading - they just won't be as performant.Perceivable
FWIW I could not get pub/sub working via inproc or tcp in the same process. receive would never return data. Now if I switched to push/pull or request/reply it worked fine. I've ensured the bind occurs before the connect and all that. So maybe there's something else I'm missing, but I'm going to assume it doesn't work at this point.Beetle
It's been awhile since I've worked with ZMQ at this point, so I haven't kept up with ongoing development, it's entirely possible that there's an issue, intentional or not, with pub/sub in this context. I will say, not having seen your actual work, pub/sub can be tricky when you're testing it out. If you're attempting to test it by sending a single message, if you're not very careful to give the connection enough time to establish itself (beyond just order of bind and connect), it can be easy to publish a message that just gets discarded. If you're waiting long enough, it's maybe just broke.Perceivable

© 2022 - 2024 — McMap. All rights reserved.