pyzmq non-blocking socket
Asked Answered
V

2

6

Can someone point me to an example of a REQ/REP non-blocking ZeroMQ (0MQ) with Python bindings? Perhaps my understanding of ZMQ is faulty but I couldn't find an example online.

I have a server in Node.JS that sends work from multiple clients to the server. The idea is that the server can spin up a bunch of jobs that operate in parallel instead of processing data for one client followed by the next

Vicissitude answered 18/9, 2012 at 1:16 Comment(0)
D
2

You can use for this goal both zmq.Poller (many examples you can find in zguide repo, eg rrbroker.py) or gevent-zeromq implementation (code sample).

Decato answered 18/9, 2012 at 3:3 Comment(0)
C
1

The example provided in the accepted answer gives the gist of it, but you can get away with something a bit simpler as well by using zmq.device for the broker while otherwise sticking to the "Extended Request-Reply" pattern from the guide. As such, a hello worldy example for the server could look something like the following:

import time
import threading
import zmq

context = zmq.Context()

def worker():
    socket = context.socket(zmq.REP)
    socket.connect('inproc://workers')
    while True:
        msg = socket.recv_string()
        print(f'Received request: [{msg}]')
        time.sleep(1)
        socket.send_string(msg)

url_client = 'tcp://*:5556'
clients = context.socket(zmq.ROUTER)
clients.bind(url_client)
workers = context.socket(zmq.DEALER)
workers.bind('inproc://workers')

for _ in range(4):
    thread = threading.Thread(target=worker)
    thread.start()

zmq.device(zmq.QUEUE, clients, workers)

Here we're letting four workers handle incoming requests in parallel. Now, you're using Node on the client side, but just to keep the example complete, one can use the Python client below to see that this works. Here, we're creating 10 requests which will then be handled in 3 batches:

import zmq
import threading

context = zmq.Context()

def make_request(a):
    socket = context.socket(zmq.REQ)
    socket.connect('tcp://localhost:5556')
    print(f'Sending request {a} ...')
    socket.send_string(str(a))
    message = socket.recv_string()
    print(f'Received reply from request {a} [{message}]')

for a in range(10):
    thread = threading.Thread(target=make_request, args=(a,))
    thread.start()
Caro answered 10/3, 2019 at 21:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.