I've just started with ZeroMQ and I'm trying to get a Hello World to work with PyZMQ and asyncio in Python 3.6. I'm trying to de-couple the functionality of a module with the pub/sub code, hence the following class setup:
Edit 1: Minimized example
Edit 2: Included solution, see answer down for how.
import asyncio
import zmq.asyncio
from zmq.asyncio import Context
# manages message flow between publishers and subscribers
class HelloWorldMessage:
def __init__(self, url='127.0.0.1', port='5555'):
self.url = "tcp://{}:{}".format(url, port)
self.ctx = Context.instance()
# activate publishers / subscribers
asyncio.get_event_loop().run_until_complete(asyncio.wait([
self.pub_hello_world(),
self.sub_hello_world(),
]))
# generates message "Hello World" and publish to topic 'world'
async def pub_hello_world(self):
pub = self.ctx.socket(zmq.PUB)
pub.connect(self.url)
# message contents
msg = "Hello World"
print(msg)
# keep sending messages
while True:
# --MOVED-- slow down message publication
await asyncio.sleep(1)
# publish message to topic 'world'
# async always needs `send_multipart()`
await pub.send_multipart([b'world', msg.encode('ascii')]) # WRONG: bytes(msg)
# processes message "Hello World" from topic 'world'
async def sub_hello_world(self):
sub = self.ctx.socket(zmq.SUB)
sub.bind(self.url)
sub.setsockopt(zmq.SUBSCRIBE, b'world')
# keep listening to all published message on topic 'world'
while True:
msg = await sub.recv_multipart()
# ERROR: WAITS FOREVER
print('received: ', msg)
if __name__ == '__main__':
HelloWorldMessage()
Problem
With the above code only 1 Hello World
is printed and then waits forever. If I press ctrl+c, I get the following error:
python helloworld_pubsub.py
Hello World
^CTraceback (most recent call last):
File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
HelloWorldMessage()
File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
self.sub_hello_world(),
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
event_list = self._selector.select(timeout)
File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Versions: libzmq: 4.2.3
, pyzmq: 17.0.0
, Ubuntu 16.04
Any insights are appreciated.
asyncio.get_event_loop() ...
from my code to run the pub/sub. Unfortunatly I get the following errors:ERR[3&7]: No such device
,ERR[4]: Invalid argument
,ERR[8]: Resource temporarily unavailable
. And it's stuck onINF: await .recv_multipart() about to be called now:
. Even when I changed the port to8080
or8081
, I'm getting the same error. Could you also explain 2) more? Or a link to some documentation? – Bellied