C++ and Python ZeroMQ 4.x PUB/SUB example does not work
Asked Answered
C

4

11

I can only find old C++ source examples. Anyways, I did mine, based on them. Here's my publisher in python:

import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5563")

while True:
    msg = "hello"
    socket.send_string(msg)
    print("sent "+ msg)
    sleep(5)

And here's the subscriber in C++:

void * ctx = zmq_ctx_new();
void * subscriber = zmq_socket(ctx, ZMQ_SUB);
// zmq_connect(subscriber, "tcp://*:5563");
   zmq_connect(subscriber, "tcp://localhost:5563");
// zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));

while (true) {
    zmq_msg_t msg;
    int rc;

    rc = zmq_msg_init( & msg);
    assert(rc == 0);
    std::cout << "waiting for message..." << std::endl;
    rc = zmq_msg_recv( & msg, subscriber, 0);
    assert(rc == 1);

    std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
    zmq_msg_close( & msg);
}

Initially, I tried zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") ); but I guess I should receive everything if I don't set this, right? So I commented it.

When I run the code, I see "waiting for message..." forever.

I tried to listen to TCP traffic using tcpdump. Turns out that when the publisher is turned on, I see a lot of garbage on the 5563 port, and when I turn the publisher off, they stop. When I tried a PUSH/PULL scheme, I could see the plaintext message in tcpdump. (I tried pushing with nodejs and pulling with c++ and it worked).

What could I be doing wrong?

I tried different combinations of .bind(), .connect(), localhost, 127.0.0.1, but they won't work either.

UPDATE: I've just read that I must subscribe to something, so I did zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 ); to subscribe to everything but I still receive nothing

PyZMQ is in version 17.0.0.b3 and has ZeroMQ 4.2.3

C++ has ZeroMQ 4.2.2

UPDATE 2:

Updates both to 4.2.3, won't work either.

Costello answered 25/12, 2017 at 20:43 Comment(0)
C
0

it's me, the one that asked the question.

I manage to get working by exchanging socket.bind("tcp://*:5563") to socket.connect("tcp://dns_address_of_my_dcker_container:5564") in python,

and exchanging zmq_connect(subscriber, "tcp://localhost:5563") to zmq_bind(subscriber, "tcp://*:5563") in C++

The examples that I found online said that I should use bind for the publisher and connect for the subscriber, but it wouldn't work in any way for me. Do anyone have an idea why?

ZeroMQ documentation says the following:

The zmq_bind() function binds the socket to a local endpoint and then accepts incoming connections on that endpoint.

The zmq_connect() function connects the socket to an endpoint and then accepts incoming connections on that endpoint.

I don't have a clear idea of what changed, but it worked.

Costello answered 10/1, 2018 at 21:30 Comment(0)
J
5

"I guess I should receive everything if I don't set this, right?"

No, this is not a correct assumption. You may like a collection of my other ZeroMQ posts here, about a { plain-string | unicode | serialisation }-issues and the { performance- | traffic- }-impacts actual policy ( SUB-side topic-filter processing on early ZeroMQ versions, and/or the PUB-side processing for more recent ones ) one may encounter in heterogeneous distributed-systems' design, using ZeroMQ.

( Any other Scalable Formal Communication Archetype Pattern, like the observed PUSH/PULL, does nothing with the subscription policy, so will work independently of the subscription-matching processing against a set topic-filter list. )


Step 0: Test the sending-part RTO first, if it .send()-s anything at all:

Let's mock-up a fast pythonic receiver, to see, if the sender indeed sends anything down the lane:

import zmq

aContext = zmq.Context()                                          # .new Context
aSUB     = aContext.socket( zmq.SUB )                             # .new Socket
aSUB.connect( "tcp://127.0.0.1:5563" )                            # .connect
aSUB.setsockopt( zmq.LINGER, 0 )                                  # .set ALWAYS!
aSUB.setsockopt( zmq.SUBSCRIBE, "" )                              # .set T-filter

MASK = "INF: .recv()-ed this:[{0:}]\n:     waited {1: > 7d} [us]"
aClk = zmq.Stopwatch(); 

while True:
      try:
           aClk.start(); print MASK.format( aSUB.recv(),
                                            aClk.stop()
                                            )
      except ( KeyboardInterrupt, SystemExit ):
           pass
           break
pass
aSUB.close()                                                     # .close ALWAYS!
aContext.term()                                                  # .term  ALWAYS!

This ought report whatever the PUB-sender is actually .send()-ing over the wire and also the actual message inter-arrival times ( in [us], glad the ZeroMQ has included this tool for debugging and performance / latency tweaking ).

If ACK-ed as you see the live INF:-messages actually ticking on screen, keep it running and it now makes sense to proceed to the next step.


Step 1: Test the receiving-part code next:

#include <zmq.h>
void *aContext = zmq_ctx_new();        
void *aSUB     = zmq_socket(     aContext, ZMQ_SUB );               std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
                 zmq_connect(    aSUB,    "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
                 zmq_setsockopt( aSUB,     ZMQ_SUBSCRIBE, "", 0 );  std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
                 zmq_setsockopt( aSUB,     ZMQ_LINGER,        0 );  std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ...    ) done" << std::endl;
int rc;
while (true) {
         zmq_msg_t       msg;                            /*       Create an empty ØMQ message */
    rc = zmq_msg_init  (&msg);          assert (rc ==  0 && "EXC: in zmq_msg_init() call" );
                                               std::cout << "INF: .. zmq_msg_init() done" << std::endl;
    rc = zmq_msg_recv  (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
                                               std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
         zmq_msg_close (&msg);                           /*       Release message */
                                               std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
}
zmq_close(    aSUB );                          std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
zmq_ctx_term( aContext );                      std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
Juratory answered 29/12, 2017 at 11:46 Comment(0)
M
2

What is the return value for zmq_setsockopt()?

Then you should use "" instead of a NULL, they are different.

zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", 0 );

As API defines:

Return value

The zmq_setsockopt() function shall return zero if successful. Otherwise it shall return -1 and set errno to one of the values defined below.
...

Mitziemitzl answered 29/12, 2017 at 1:37 Comment(0)
P
2

The correct recipe to run the PUB/SUB pattern (regardless of language) is:

Pub

  1. socket(zmq.PUB)
  2. bind("tcp://127.0.0.1:5555")
  3. encoding (usually just encode() for strings but you can also compress() or dumps() for objects or even both) encoded_topic = topic.encode() encoded_msg = msg.encode()
  4. send_multipart([encoded_topic, encoded_msg])

Sub

  1. socket(zmq.SUB)
  2. setsockopt(zmq.SUBSCRIBE, topic.encode())
  3. connect("tcp://127.0.0.1:5555")
  4. answer = recv_multipart()
  5. decode the answer enc_topic, enc_msg = answer topic = enc_topic.decode() msg = enc_msg.decode()

In general, steps Pub - 2 / Sub - 3 (ie bind/connect) and Pub - 3 / Sub - 5 (ie encode/decode or dumps/loads) need to be complementary to each other in order for things to work.

Pean answered 4/1, 2018 at 22:35 Comment(0)
C
0

it's me, the one that asked the question.

I manage to get working by exchanging socket.bind("tcp://*:5563") to socket.connect("tcp://dns_address_of_my_dcker_container:5564") in python,

and exchanging zmq_connect(subscriber, "tcp://localhost:5563") to zmq_bind(subscriber, "tcp://*:5563") in C++

The examples that I found online said that I should use bind for the publisher and connect for the subscriber, but it wouldn't work in any way for me. Do anyone have an idea why?

ZeroMQ documentation says the following:

The zmq_bind() function binds the socket to a local endpoint and then accepts incoming connections on that endpoint.

The zmq_connect() function connects the socket to an endpoint and then accepts incoming connections on that endpoint.

I don't have a clear idea of what changed, but it worked.

Costello answered 10/1, 2018 at 21:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.