What do I need to do to get ZMQ_RADIO / ZMQ_DISH to work properly?
Asked Answered
P

3

3

I'm attempting to use the ZMQ draft specs ZMQ_RADIO and ZMQ_DISH. I built libzmq and cppzmq with CMake ExternalProject and and the flag ENABLE_DRAFTS=ON and verified it was built with drafts using the zmq_has() function. I modified the standard hello world example to use radio and dish and cannot get them to talk. I also get compilation errors that ZMQ_RADIO and ZMQ_DISH are undefined. I defined them manually and it compiles but I never get an actual connection so it seems like something else is wrong.

Here's my code:

CMakeLists.txt

cmake_minimum_required(VERSION 2.8.11)
project(zmq_udp)

include(ExternalProject)

ExternalProject_Add(libzmq
    GIT_REPOSITORY https://github.com/zeromq/libzmq
    GIT_TAG master
    CMAKE_ARGS 
      -DENABLE_DRAFTS=ON
      -DWITH_PERF_TOOL=OFF 
      -DZMQ_BUILD_TESTS=OFF 
      -DENABLE_CPACK=OFF
      -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/zmq
      -DCMAKE_LIBRARY_OUTPUT_DIRECTORY=${CMAKE_BINARY_DIR}/zmq/lib
      -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
      -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
      -DCMAKE_SHARED_LINKER_FLAGS=${CMAKE_SHARED_LINKER_FLAGS}
)

ExternalProject_Add(cppzmq
    GIT_REPOSITORY https://github.com/zeromq/cppzmq
    GIT_TAG master
    CONFIGURE_COMMAND ""
    BUILD_COMMAND ""
    INSTALL_COMMAND ${CMAKE_COMMAND} -E copy <SOURCE_DIR>/zmq.hpp ${CMAKE_BINARY_DIR}/zmq/include/zmq.hpp
    TEST_COMMAND ""
)

add_dependencies(cppzmq libzmq)

set(ZEROMQ_LIBNAME "libzmq.so")
set(ZEROMQ_INCLUDE_DIRS ${CMAKE_BINARY_DIR}/zmq/include)
set(ZEROMQ_LIBRARIES ${CMAKE_BINARY_DIR}/zmq/lib/${ZEROMQ_LIBNAME})

include_directories(${ZEROMQ_INCLUDE_DIRS})

add_executable(server server.cpp)
add_executable(client client.cpp)
add_dependencies(server cppzmq)
add_dependencies(client cppzmq)
target_link_libraries(server ${ZEROMQ_LIBRARIES})
target_link_libraries(client ${ZEROMQ_LIBRARIES})

server.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>

#define ZMQ_DISH 15

int main ()
{
    std::cout << zmq_has("draft") << std::endl;

    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_DISH);
    socket.bind ("udp://127.0.0.1:5555");

    while (true)
    {
        zmq::message_t request;

        socket.recv (&request);
        std::cout << "Received Hello" << std::endl;
    }

    return 0;
}

client.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

#define ZMQ_RADIO 14

int main ()
{
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_RADIO);

    std::cout << "Connecting to hello world server…" << std::endl;
    socket.connect ("udp://127.0.0.1:5555");

    for (int request_nbr = 0; request_nbr != 10; request_nbr++)
    {
        zmq::message_t request (5);
        memcpy (request.data (), "Hello", 5);
        std::cout << "Sending Hello " << request_nbr << "…" << std::endl;
        socket.send (request);

        sleep(1);
    }

    return 0;
}

The server outputs a 1 as expected for the zmq_has() function, which should verify libzmq was built with the draft API mode on.

What do I need to do to get RADIO/DISH to work properly?

I'd like to use ZMQ on a project as a UDP receiver to receive some UDP packets from a non-ZMQ application.

Predestinate answered 17/7, 2017 at 20:56 Comment(2)
" I defined them manually ..." Are you sure that is a good idea?Eolande
I defined the socket types manually just in an effort to diagnose the issue.Predestinate
C
2

The RADIO and DISH are in draft state and not available in stable build. If you need to access DRAFT API, build zmq from this link

The following is part of zmq.hpp

// These functions are DRAFT and disabled in stable releases, and subject to 
// change at ANY time until declared stable.                                 
    #ifdef ZMQ_BUILD_DRAFT_API

    //DRAFT Socket types.                                                       
#define ZMQ_SERVER 12
#define ZMQ_CLIENT 13
#define ZMQ_RADIO 14
#define ZMQ_DISH 15
#define ZMQ_GATHER 16
#define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18
#endif
Claptrap answered 6/9, 2017 at 17:8 Comment(0)
S
1

Here is example of working with those two. I already tested it.

Most important thing you need to check that you don't load your system zmq library. In my case I did such thing in cmake:

set(ENABLE_DRAFTS ON)

add_subdirectory(libzmq)
set_target_properties(libzmq PROPERTIES PREFIX "dev-")

# If you want target
add_library(CppZeroMQ INTERFACE)

target_link_libraries(CppZeroMQ INTERFACE $<$<CONFIG:Debug>:libzmq>$<$<CONFIG:Release>:libzmq-static>)
# For CPP headers (you may install and change path here)
target_include_directories(CppZeroMQ INTERFACE my/path/to/cppzmq)
target_compile_definitions(CppZeroMQ INTERFACE ZMQ_BUILD_DRAFT_API=1)

Thanks to that it will distinguish between system lib and yours from source. Probably system library isn't using DRAFTS.


Server/Publisher part

    zmq::context_t context(1);
    zmq::socket_t publisher(context, ZMQ_RADIO);
    // We need set IP of destination, sad but true
    publisher.connect("udp://127.0.0.1:30666");

    std::string text;
    text.reserve(128);

    int number = 0;
    while(publisher.connected())
    {
        std::chrono::microseconds timestamp = std::chrono::duration_cast<std::chrono::microseconds>(
            std::chrono::system_clock::now().time_since_epoch());

        text.clear();
        text += std::to_string(timestamp.count());
        text += ";";
        text += std::to_string(++number);

        zmq::message_t update{text.data(), text.size()};
        update.set_group("test");
        std::cout << "Sending: " << timestamp << " number:" << number << std::endl;
        publisher.send(update);
        std::this_thread::sleep_for(1s);
    }

Client/Subscriber part

    zmq::context_t context(1);
    zmq::socket_t subscriber(context, ZMQ_DISH);
    subscriber.bind("udp://*:30666");
    subscriber.join("test");

    int previousNumber = 0;
    int lostCount = -1;

    while(subscriber.connected())
    {
        zmq::message_t update;

        subscriber.recv(&update);

        std::string_view text(update.data<const char>(), update.size());
        std::cout << text;

        auto splitPoint = text.find(';');
        std::string serverTime = std::string{text.substr(0, splitPoint)};
        std::string serverNumber = std::string{text.substr(splitPoint + 1)};
        auto number = std::stoi(serverNumber);
        if(number != previousNumber + 1)
        {
            ++lostCount;
        }
        previousNumber = number;

        const auto diff =
            system_clock::now() -
            system_clock::time_point{std::chrono::microseconds{std::stoull(serverTime)}};

        // Beautify at: https://github.com/gelldur/common-cpp/blob/master/src/acme/beautify.h
        std::cout << " ping:" << Beautify::nice{diff} << "UDP lost: " << lostCount << std::endl;
    }
Salmons answered 16/7, 2019 at 6:34 Comment(0)
I
0

Houston, we have a problem:

I am not familiar with conditional builds and including draft-API(s) in recent ZeroMQ versions. If it were indeed intended to work your assumed way, the #define-s ought have been already solved there, haven't they?

Maybe you have digged from some GitHub source the correct #define ordinals for ZMQ_RADIO + ZMQ_DISH, compatible with core-functions, but a general approach to just manually:

#define                        A_NOT_IMPLEMENTED_CABLE_TV_BROADCAST_ARCHETYPE -1234
void   *dsh = zmq_socket( ctx, A_NOT_IMPLEMENTED_CABLE_TV_BROADCAST_ARCHETYPE );
assert( dsh              && "INF: a socket instantiation from [ctx] failed." );

        rc = bind( dsh, "udp://*:5555" );
assert( rc == 0          && "INF: a socket .bind( 'udp://*.5555' ) failed.");

sounds a lot suspicious, even with a promise of the flag ENABLE_DRAFTS=ON, doesn't it?


Summary

If your project aims at using RADIO/DISH, carefully review both the published API ( warnings about not-implemented / not-released features ), where you find also other mandatory steps:

Radio-dish is using groups (vs Pub-sub topics), Dish sockets can join a group and each message sent by Radio sockets belong to a group.

Groups are null terminated strings limited to 16 chars length (including null). The intention is to increase the length to 40 chars (including null).

Groups are matched using exact matching (vs prefix matching of PubSub).

ZMQ_RADIO side must use zmq_msg_set_group(3) to first assign a message to a group.

ZMQ_DISH side must use a call to zmq_join(3) so as to "enter" a group so as to receive any message, as by default there is, obviously, no membership upon it's instantiation.

ZMQ_DISH side may use a call to zmq_msg_group(3) to get the group the message actually belongs to.


ZeroMQ is in W.I.P. - so may like to check for similar services.

If in a need and hurry, Martin Sustrik has initiated another smart messaging/signalling tool - the .

After some troubles, the seems to have rolled out production release, where Scalable Formal Communication Patterns may help you in your Project goals. Worth a try.

Isham answered 18/7, 2017 at 12:25 Comment(4)
The main goal is just a simple UDP receiver without using cumbersome libraries or re-inventing socket communication. The project already uses ZeroMQ for other reasons so it seemed nice to possibly re-use it for the needed UDP receive duty. I understand it's a work in progress and am ok with that, it's just not well documented (which is understandable and also why I asked the question here). I'm considering LCM as an alternative but will check out nanomsg too. Thanks.Predestinate
Glad to help robots get out there into outer space :o) Have you tried first to setup properly the calls to zmq_msg_set_group() on the ZMQ_RADIO-side + the zmq_join() on the ZMQ_DISH-side to work as defined?Isham
So, I'm reviewing the documentation I've found and it's obviously different from the documentation you're looking at. Would you mind linking to where you found this documentation (or did you just glean it from the code?).Predestinate
If you are familiar with PUB/SUB topic-filter subscriptions, the new RADIO/DISH is just trying to avoid .setsockopt( zmq.SUBSCRIBE, <aTopicFilterLeftMatchingFRAGMENT> ) and exposes another API mechanism, via a socket-related call to zmq_join(), adding some further limitations ( exact match, NULL-termination, zero support for multi-part messages ). But it shall work for you.Isham

© 2022 - 2024 — McMap. All rights reserved.