ZeroMQ IPC across several instances of a program
Asked Answered
O

1

6

I am having some problems with inter process communication in ZMQ between several instances of a program

  • I am using Linux OS
  • I am using zeromq/cppzmq, header-only C++ binding for libzmq

If I run two instances of this application (say on a terminal), I provide one with an argument to be a listener, then providing the other with an argument to be a sender. The listener never receives a message. I have tried TCP and IPC to no avail.

#include <zmq.hpp>
#include <string>
#include <iostream>

int ListenMessage();
int SendMessage(std::string str);

zmq::context_t global_zmq_context(1);

int main(int argc, char* argv[] ) {
    std::string str = "Hello World";
    if (atoi(argv[1]) == 0) ListenMessage();
    else SendMessage(str);

    zmq_ctx_destroy(& global_zmq_context);
    return 0;
}


int SendMessage(std::string str) {
    assert(global_zmq_context);
    std::cout << "Sending \n";
    zmq::socket_t publisher(global_zmq_context, ZMQ_PUB);
    assert(publisher);

    int linger = 0;
    int rc = zmq_setsockopt(publisher, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_connect(publisher, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: connect failed: %s\n", strerror (errno));
        return -1;
    }

    zmq::message_t message(static_cast<const void*> (str.data()), str.size());
    rc = publisher.send(message);
    if (rc == -1) {
        printf ("E: send failed: %s\n", strerror (errno));
        return -1;
    }
    return 0;
}

int ListenMessage() {
    assert(global_zmq_context);
    std::cout << "Listening \n";
    zmq::socket_t subscriber(global_zmq_context, ZMQ_SUB);
    assert(subscriber);

    int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(rc==0);

    int linger = 0;
    rc = zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_bind(subscriber, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: bind failed: %s\n", strerror (errno));
        return -1;
    }

    std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
    while (true) {
        zmq::message_t rx_msg;
        // when timeout (the third argument here) is -1,
        // then block until ready to receive
        std::cout << "Still Listening before poll \n";
        zmq::poll(p.data(), 1, -1);
        std::cout << "Found an item \n"; // not reaching
        if (p[0].revents & ZMQ_POLLIN) {
            // received something on the first (only) socket
            subscriber.recv(&rx_msg);
            std::string rx_str;
            rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
            std::cout << "Received: " << rx_str << std::endl;
        }
    }
    return 0;
}

This code will work if I running one instance of the program with two threads

    std::thread t_sub(ListenMessage);
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
    std::thread t_pub(SendMessage str);
    t_pub.join();
    t_sub.join();

But I am wondering why when running two instances of the program the code above won't work?

Thanks for your help!

Oaxaca answered 7/2, 2020 at 20:57 Comment(3)
One difference is the shared global context. What if each thread gets its own? What if you don't allocate it globally?Bellinzona
Exactly, I was worried that each instance of the program is making its own copy of that global contextOaxaca
I believe the context will be shared when running two threads, which is why the code above works in that case, but when running two different instances of the program, it won't because of that global context perhaps. I have no idea how to work around itOaxaca
L
3

wondering why when running two instances of the program the code above won't work?

This code will never fly - and it has nothing to do with thread-based nor the process-based [CONCURENT] processing.

It was caused by a wrong design of the Inter Process Communication.

ZeroMQ can provide for this either one of the supported transport-classes :
{ ipc:// | tipc:// | tcp:// | norm:// | pgm:// | epgm:// | vmci:// } plus having even smarter one for in-process comms, an inproc:// transport-class ready for inter-thread comms, where a stack-less communication may enjoy the lowest ever latency, being just a memory-mapped policy.

The selection of L3/L2-based networking stack for an Inter-Process-Communication is possible, yet sort of the most "expensive" option.


The Core Mistake

Given that choice, any single processes ( not speaking about a pair of processes ) will collide on an attempt to .bind() its AccessPoint onto the very same TCP/IP-address:port#


The Other Mistake

Even for the sake of a solo programme launched, both of the spawned threads attempt to .bind() its private AccessPoint, yet none does an attempt to .connect() a matching "opposite" AccessPoint.

At least one has to successfully .bind(), and
at least one has to successfully .connect(), so as to get a "channel", here of the PUB/SUB Archetype.


To-do

  • decide about a proper, right-enough Transport-Class ( best avoid an overkill to operate the full L3/L2-stack for localhost/in-process IPC )
  • refactor the Address:port# management ( for 2+ processes not to fail on .bind()-(s) to the same ( hard-wired ) address:port#
  • always detect and handle appropriately the returned {PASS|FAIL}-s from API calls
  • always set LINGER to zero explicitly ( you never know )
Lasala answered 8/2, 2020 at 8:38 Comment(4)
First, thanks a lot for your clear answer and reasoning. Correct, I have tried all combinations of connects and binds (usually bind is for listening and connect for broadcasting), but this still hasn't solved the issue. Also, I've edited the code to detect the exceptions from API calls (now reflected in the code above) as well as added linger and changed the tcp transportOaxaca
NB: I've tried linger on the binding socket, the connection socket, and both. Still isn't working :/Oaxaca
LINGER is a defensive step ( saves your O/S resources from infinite hangups in case of un-coordinated .close() or .term() or even crash ). Next, there is no principal difference in which side .bind()-s or .connect()-s. What error-codes do you receive from API on NACK-s? Last week I remember someone here, who has observed an error in C++ wrapper/binding and started to use the pure C-lang API interfacing - did you read it or tested the same ( root cause isolation ) steps?Lasala
Agreed that messing with LINGER can lead to other weird behaviour - it's been added for a reason and setting to 0 defeats it. Instead it's best to explicitly use shutdown() in your code, and let linger backstop you as it's designed to do in case of communications failure.Scruffy

© 2022 - 2024 — McMap. All rights reserved.