How can I use send_json with pyzmq PUB SUB
Asked Answered
R

2

1

I need to send a dictionary as the message from a publisher to subscribers. With the REQ/REP pattern send_json and recv_json work nicely, but I can't seem to find an incantation that works for PUB/SUB. Hope it's not the case that PUB/SUB can only work with send() and recv().

Here's the listing for the experiment I put together:

"""
Experiments with 0MQ PUB/SUB pattern
"""
import os
import sys
import time
import zmq
from multiprocessing import Process
from random import sample, choice

import signal
def handler(signum, frame):
    """ Handler for SIGTERM """
    # kill the processes we've launched
    try:
        for name, proc in _procd.iteritems():
            if proc and proc.is_alive():
                proc.terminate()
    finally:            
        os._exit(0)

signal.signal(signal.SIGTERM, handler)  
PORT = 5566
TOPICS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

def publisher():
    """ Randomly update and publish topics """
    context = zmq.Context()
    sock = context.socket(zmq.PUB)
    sock.bind("tcp://*:{}".format(PORT))
    ## Init a dict of topic:value pairs
    alltopics = dict()
    for char in TOPICS:
        alltopics[char] = time.time()

    while True:
        topic = choice(TOPICS)
        alltopics[topic] = time.time()
        ## THIS IS SENDING
        sock.send_json((topic, alltopics))
        print "Sent topic {}".format(topic)
        time.sleep(1)


def client(number, topics):
    """ 
    Subscribe to list of topics and wait for messages. 
    """
    context = zmq.Context()
    sock = context.socket(zmq.SUB)
    sock.connect("tcp://localhost:{}".format(PORT))
    for topic in topics:
        sock.setsockopt(zmq.SUBSCRIBE, topic)

    print "subscribed to topics {}".format(topics)    

    while True:
        ## THIS NEVER RETURNS
        print sock.recv_json() 

        ## ALREADY TRIED THIS. DOES NOT WORK  
        #topic, msg = sock.recv_json()
        #print  "Client{}  {}:{}".format(number, topic, msg[topic])

        sys.stdout.flush()

if __name__ == '__main__':
    _procd = dict()
    ## Launch publisher
    name = 'publisher'
    _procd[name] = Process(target=publisher, name=name)
    _procd[name].start()

    ## Launch the subscribers
    for n in range(10):
        name = 'client{}'.format(n)
        _procd[name] = Process(target=client,
                               name=name,
                               args=(n, sample(TOPICS,3)))
        _procd[name].start()



    ## Sleep until killed
    while True:
        time.sleep(1)    

And here is the output up to the point where I kill the parent process

$ python pubsub.py
Sent topic Y
subscribed to topics ['B', 'Q', 'F']
subscribed to topics ['N', 'E', 'O']
subscribed to topics ['Y', 'G', 'M']
subscribed to topics ['G', 'D', 'I']
subscribed to topics ['D', 'Y', 'W']
subscribed to topics ['A', 'N', 'W']
subscribed to topics ['F', 'K', 'V']
subscribed to topics ['A', 'Q', 'X']
subscribed to topics ['S', 'Y', 'V']
subscribed to topics ['E', 'S', 'D']
Sent topic I
Sent topic N
Sent topic J
Sent topic I
Sent topic A
Sent topic T
Sent topic A
Sent topic K
Sent topic V
Sent topic E

The subscriptions and sending seems ok but the clients never print anything. The tracebacks for the client processes show that them hanging on the sock.recv_json() call. My first attempt is commented out. It also hangs.

Rimbaud answered 7/8, 2014 at 17:39 Comment(2)
Any particular reason you really want to avoid just serializing your dict?Cortneycorty
@Cortneycorty I may have to resort to that. I guess I was assuming that zmq accomplished subscriber-side topic filtering by including the topic in the message frame. I'm getting the feeling they're just looking at the front of the message content.Rimbaud
R
4

I'd still like to see it work with send_json() and recv_json() but, per Jason's suggestion, the following is working:

def mogrify(topic, msg):
    """ json encode the message and prepend the topic """
    return topic + ' ' + json.dumps(msg)

def demogrify(topicmsg):
    """ Inverse of mogrify() """
    json0 = topicmsg.find('{')
    topic = topicmsg[0:json0].strip()
    msg = json.loads(topicmsg[json0:])
    return topic, msg 

with this in publisher()

    sock.send(mogrify(topic, alltopics))

and this in client()

   topic, msg = demogrify(sock.recv())

Here's the complete listing followed by some sample output:

#!/usr/bin/env python
# coding: utf8 
"""
Experiments with 0MQ PUB/SUB pattern.
Creates a publisher with 26 topics (A, B, ... Z) and
spawns clients that randomly subscribe to a subset
of the available topics. Console output shows 
who subscribed to what, when topic updates are sent
and when clients receive the messages.
Runs until killed.
Author: Michael Ellis
License: WTFPL
"""
import os
import sys
import time
import zmq
from multiprocessing import Process
from random import sample, choice
import json

PORT = 5566
TOPICS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" # split into ['A', 'B', ... ]
PUBSLEEP = 0.01 # Sleep time at bottom of publisher() loop.
NCLIENTS = 10  # Number of clients spawned.
NSUBS = 3  # Number of topics each client subscribes to.

assert NSUBS <= len(TOPICS)

def mogrify(topic, msg):
    """ json encode the message and prepend the topic """
    return topic + ' ' + json.dumps(msg)

def demogrify(topicmsg):
    """ Inverse of mogrify() """
    json0 = topicmsg.find('{')
    topic = topicmsg[0:json0].strip()
    msg = json.loads(topicmsg[json0:])
    return topic, msg

def publisher():
    """ Randomly update and publish topics """
    context = zmq.Context()
    sock = context.socket(zmq.PUB)
    sock.bind("tcp://*:{}".format(PORT))
    ## Init a dict of topic:value pairs
    alltopics = dict()
    for char in TOPICS:
        alltopics[char] = time.time()

    while True:
        try:
            topic = choice(TOPICS)
            alltopics[topic] = time.time()
            sock.send(mogrify(topic, alltopics))
            print "Sent topic {}".format(topic)
            time.sleep(PUBSLEEP)
        except KeyboardInterrupt:
            sys.exit()

def client(number, topics):
    """ 
    Subscribe to list of topics and wait for messages. 
    """
    context = zmq.Context()
    sock = context.socket(zmq.SUB)
    sock.connect("tcp://localhost:{}".format(PORT))
    for topic in topics:
        sock.setsockopt(zmq.SUBSCRIBE, topic)

    print "subscribed to topics {}".format(topics)    

    while True:
        try:
            topic, msg = demogrify(sock.recv())
            print  "Client{}  {}:{}".format(number, topic, msg[topic])
            sys.stdout.flush()
        except KeyboardInterrupt:
            sys.exit()

_procd = dict()
def run():
    """ Spawn publisher and clients. Loop until terminated. """
    ## Launch publisher
    name = 'publisher'
    _procd[name] = Process(target=publisher, name=name)
    _procd[name].start()

    ## Launch the subscribers
    for n in range(NCLIENTS):
        name = 'client{}'.format(n)
        _procd[name] = Process(target=client,
                               name=name,
                               args=(n, sample(TOPICS, NSUBS)))
        _procd[name].start()


    ## Sleep until killed
    while True:
        time.sleep(1)          

if __name__ == '__main__':
    import signal
    def handler(signum, frame):
        """ Handler for SIGTERM """
        # kill the processes we've launched
        try:
            for _, proc in _procd.iteritems():
                if proc and proc.is_alive():
                    proc.terminate()
        finally:            
            sys.exit()

    signal.signal(signal.SIGTERM, handler)

    run()

Sample Output

$ pubsub.py 
Sent topic Q
subscribed to topics ['R', 'G', 'S']
subscribed to topics ['J', 'K', 'C']
subscribed to topics ['L', 'B', 'P']
subscribed to topics ['X', 'Z', 'A']
subscribed to topics ['K', 'O', 'R']
subscribed to topics ['J', 'Z', 'T']
subscribed to topics ['R', 'G', 'P']
subscribed to topics ['Y', 'A', 'O']
subscribed to topics ['U', 'S', 'C']
subscribed to topics ['B', 'P', 'L']
Sent topic U
Client8  U:1407506576.27
Sent topic E
Sent topic A
Client3  A:1407506576.29
Client7  A:1407506576.29
Sent topic A
Client3  A:1407506576.31
Client7  A:1407506576.31
Sent topic G
Client0  G:1407506576.32
Client6  G:1407506576.32
Sent topic E
Sent topic B
Client2  B:1407506576.34
Client9  B:1407506576.34
Sent topic R
Client0  R:1407506576.35
Client6  R:1407506576.35
Client4  R:1407506576.35
Sent topic U
Client8  U:1407506576.36
...
Rimbaud answered 7/8, 2014 at 19:37 Comment(2)
Instead of simply looking for the first {, you might want to follow the example in the docs, zguide.zeromq.org/page:all#Pub-Sub-Message-Envelopes : send_multipart([topic, json.dumps(msg)]) and [topic, json_data] = recv_multipart(); msg = json.loads(json_data)Delacroix
That doesn't work; you'll get an memory view error.Neelon
C
2

I believe what you're after is send_multipart(), rather than send_json(). In ZeroMQ parlance, a message with multiple frames is a multi part message, and the first frame is what determines the "subscriber topic" for pub/sub. From the examples that I've seen, it will also divine a topic from the beginning of a string if you only send a single frame.

Cortneycorty answered 7/8, 2014 at 21:57 Comment(4)
Thanks. I looked at the docs and searched for some examples. AFAICT, I'd still need to do the JSON encoding manually so, for my application, it doesn't seem like much of win vs the mogrify/demogrify functions I posted yesterday. Still, it's worthwhile knowing multipart exists; it might be useful in the future.Rimbaud
If you need a dict structure on your pub to show up as a dict structure on the sub, then that's probably the best way to go. I perhaps didn't look close enough, it looked like you were using the dict just as a structure to create message frames, where the key would be the topic and the value would be the message. If you were intending it that way, then you could simply follow the strategy here to get it done.Cortneycorty
Yes. In my real application, the publisher maintains a relatively small dict representing the overall application state. Some processes update it with info from external sources, others need to know when something has changed to update displays, etc. The PUB/SUB pattern is useful because it blocks on the subscriber end. Using topics allows to minimize the rate at which the client processes have to wake up and do something. Sending the dict structure as JSON means it can be interpreted by Javascript as well as Python.Rimbaud
That makes total sense. If you want to do it the "ZMQ way", then I'd serialize your dict, then use send_multipart() to properly use message frames for the subscriber topic. But your solution appears equally workable. Very interesting architecture, by the way, I like it :)Cortneycorty

© 2022 - 2024 — McMap. All rights reserved.