Python + ZMQ: Operation cannot be accomplished in current state
Asked Answered
B

3

23

I am trying to get a python program to communicate with another python program via zeromq by using the request-reply pattern. The client program should send a request to the server program which replies.

I have two servers such that when one server fails the other takes over. Communication works perfect when the first server works, however, when the first server fails and when I make a request to the second server, I see the error:

zmp.error.ZMQError: Operation cannot be accomplished in current state

Code of the server 1:

# Run the server
while True:

    # Define the socket using the "Context"
    sock = context.socket(zmq.REP)
    sock.bind("tcp://127.0.0.1:5677")
    data = sock.recv().decode("utf-8")
    res = "Recvd"
    sock.send(res.encode('utf-8'))

Code of the server 2:

# Run the server
while True:

    # Define the socket using the "Context"
    sock = context.socket(zmq.REP)
    sock.bind("tcp://127.0.0.1:5877")
    data = sock.recv().decode("utf-8")
    res = "Recvd"
    sock.send(res.encode('utf-8'))

Code of client:

# ZeroMQ Context For distributed Message amogst processes
context = zmq.Context()
sock_1 = context.socket(zmq.REQ)
sock_2 = context.socket(zmq.REQ)
sock_1.connect("tcp://127.0.0.1:5677")
sock_2.connect("tcp://127.0.0.1:5877")

try:
    sock_1.send(data.encode('utf-8'), zmq.NOBLOCK)
    socks_1.setsockopt(zmq.RCVTIMEO, 1000)
    socks_1.setsockopt(zmq.LINGER, 0)
    data = socks_1.recv().decode('utf-8') #receive data from the main node  

except:
    try:
        #when server one fails
        sock_2.send(data.encode('utf-8'), zmq.NOBLOCK)
        socks_2.setsockopt(zmq.RCVTIMEO, 1000)
        socks_2.setsockopt(zmq.LINGER, 0)
        data = socks_2.recv().decode('utf-8')
    except Exception as e:
         print(str(e))

What is the problem with this approach? How can I resolve this?

Bayonet answered 7/12, 2016 at 5:16 Comment(7)
Why does your client and server both bind to the same port on the loopback ? Should that be a connect ?Birdwell
Oh sorry, made the mistake whilst copying it here. Rectified it.Bayonet
Is the second server socket open ? A complete stack traceback when the exception happens would be helpful to understand the problemBirdwell
Yes, the second socket is running and does receive the data from the client. The problem is the receive part. The stack trace points to the line: data = socks_2.recv().decode('utf-8') and then to main.Bayonet
socks_2 vs sock_2 and socks_1 vs sock_1 what is the difference ?Birdwell
The server code creates a new REP sock on each pass through the loop, without closing the previous one. I'd expect this to fail (address already in use) in the 2nd pass through the loop.Clutch
Implement the lazy pirate pattern. Create a new socket from your context when an error is caught, before trying to send the message again.Disagreement
C
11

Q: How can I resolve this?
A: Avoid the known risk of REQ/REP deadlocking!

While the ZeroMQ is a powerful framework, understanding its internal composition is necessary for robust and reliable distributed systems design and prototyping.

After a closer look, using a common REQ/REP Formal Communication Pattern may leave ( and does leave ) counter-parties in a mutual dead-lock: where one is expecting the other to do a step, which will be never accomplished, and there is no way to escape from the deadlocked state.

For more illustrated details and FSA-schematic diagram, see this post

Next, a fail-over system has to survive any collisions of its own components. Thus, one has to design well the distributed system state-signalling and avoid as many dependencies on element-FSA-design/stepping/blocking as possible, otherwise, the fail-safe behaviour remains just an illusion.

Always handle resources with care, do not consider components of the ZeroMQ smart-signalling/messaging as any kind of "expendable disposables", doing so might be tolerated in scholar examples, not in production system environments. You still have to pay the costs ( time, resources allocations / de-allocations / garbage-collection(s) ). As noted in comments, never let resources creation/allocation without a due control. while True: .socket(); .bind(); .send(); is brutally wrong in principle and deteriorating the rest of the design.

Calomel answered 7/12, 2016 at 10:48 Comment(1)
I'm somewhat surprised this answer has the most votes, as it doesn't seem to concretely answer the question at hand, but rather contains a lot of high-level and abstract advice. Apparently this is helpful to some people, but it doesn't help me solve my problem at the level of implementation.Dentil
M
5

On server side, "receive" and "send" pair is critical. I was facing a simiar issue, while socket.send was missed.

def zmq_listen():
    global counter
    message = socket_.recv().decode("utf-8")
    logger.info(f"[{counter}] Message: {message}")
    request = json.loads(message)
    request["msg_id"] = f"m{counter}"
    ack = {"msg_id": request["msg_id"]}
    socket_.send(json.dumps(ack).encode("utf-8"))
    return request
Monteverdi answered 7/6, 2020 at 13:22 Comment(0)
D
1

Implement the lazy pirate pattern. Create a new socket from your context when an error is caught, before trying to send the message again.

The pretty good brute force solution is to close and reopen the REQ socket after an error

Here is a python example.

#
#   Author: Daniel Lundin <dln(at)eintr(dot)org>
#
from __future__ import print_function

import zmq

REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"

context = zmq.Context(1)

print("I: Connecting to server…")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)

poll = zmq.Poller()
poll.register(client, zmq.POLLIN)

sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
    sequence += 1
    request = str(sequence).encode()
    print("I: Sending (%s)" % request)
    client.send(request)

    expect_reply = True
    while expect_reply:
        socks = dict(poll.poll(REQUEST_TIMEOUT))
        if socks.get(client) == zmq.POLLIN:
            reply = client.recv()
            if not reply:
                break
            if int(reply) == sequence:
                print("I: Server replied OK (%s)" % reply)
                retries_left = REQUEST_RETRIES
                expect_reply = False
            else:
                print("E: Malformed reply from server: %s" % reply)

        else:
            print("W: No response from server, retrying…")
            # Socket is confused. Close and remove it.
            client.setsockopt(zmq.LINGER, 0)
            client.close()
            poll.unregister(client)
            retries_left -= 1
            if retries_left == 0:
                print("E: Server seems to be offline, abandoning")
                break
            print("I: Reconnecting and resending (%s)" % request)
            # Create new connection
            client = context.socket(zmq.REQ)
            client.connect(SERVER_ENDPOINT)
            poll.register(client, zmq.POLLIN)
            client.send(request)

context.term()
Disagreement answered 12/11, 2018 at 12:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.