Why ZeroMQ fails to communicate when I use multiprocessing.Process to run?
Asked Answered
P

2

6

please see the code below :


server.py

import zmq 
import time
from multiprocessing import Process
class A:
  def __init__(self):
    ctx = zmq.Context(1)
    sock = zmq.Socket(ctx, zmq.PUB)
    sock.bind('ipc://test')
    p = Process(target=A.run, args=(sock,))
    p.start()     # Process calls run, but the client can't receive messages
    p.join()      #
    #A.run(sock)  # this one is ok, messages get it to be received

  @staticmethod
  def run(sock):
    while True:
      sock.send('demo'.encode('utf-8'))
      print('sent')
      time.sleep(1)

if __name__ =='__main__':
  a = A()

client.py

import zmq 
ctx=zmq.Context(1)
sock = zmq.Socket(ctx, zmq.SUB)
sock.connect('ipc://test')
sock.setsockopt_string(zmq.SUBSCRIBE, '') 
while True:
  print(sock.recv())

In the constructor of server.py, if I call .run()-method directly, the client can receive the message, but when I use the multiprocessing.Process()-method, it fails. Can anyone explain on this and provide some advice?

Punt answered 10/8, 2020 at 2:57 Comment(0)
G
2

Q : "Why ZeroMQ fails to communicate when I use multiprocessing.Process to run?"

Well, ZeroMQ does not fail to communicate, the problem is, how Python multiprocessing module "operates".

The module is designed so that some processing may escape from the python central GIL-lock (re-[SERIAL]-iser, that is used as a forever present [CONCURRENT]-situations' principal avoider).

This means that the call to the multiprocessing.Process makes one exact "mirror-copy" of the python interpreter state, "exported" into new O/S-spawned process (details depend on localhost O/S).

Given that, there is zero chance a "mirror"-ed replica could get access to resources already owned by the __main__ - here the .bind()-method already acquired ipc://test address, so "remote"-process will never get "permission" to touch this ZeroMQ AccessPoint, unless the code gets repaired & fully re-factored.

Q : "Can anyone explain on this and provide some advice?"

Sure. The best step to start is to fully understand Pythonic culture of monopolistic GIL-lock re-[SERIAL]-isation, where no two things ever happen in the same time, so even adding more threads does not speed-up the flow of the processing, as it all gets re-aligned by the central "monopolist" The GIL-lock.

Next, understanding a promise of a fully reflected copy of the python interpreter state, while it sounds promising, also has some obvious drawbacks - the new processes, being "mirror"-copies cannot introduce colliding cases on already owned resources. If they try to, a not working as expected cases are the milder of the problems in such principally ill-designed cases.

In your code, the first row in __main__ instantiates a = A(), where A's .__init__ method straight occupies the IPC-resource since .bind('ipc://test'). The later step, p = Process( target = A.run, args = ( sock, ) ) "mirror"-replicates the state of the python interpreter (an as-is copy) and the p.start() cannot but crash into disability to "own" the same resource as the __main__ already owns (yes, the ipc://test for a "mirror"-ed process instructed call to grab the same, non-free resource in .bind('ipc://test') ). This will never fly.

Last but not least, enjoy the Zen-of-Zero, the masterpiece of Martin SUSTRIK for , so well crafted for ultimately scalable, almost zero-latency, very comfortable, widely ported signalling & messaging framework.

Graeme answered 10/8, 2020 at 8:48 Comment(0)
M
4

Short answer: Start your subprocesses. Create your zmq.Context- and .Socket-instances from within your Producer.run()-classmethod within each subprocess. Use .bind()-method on the side on which your cardinality is 1, and .connect()-method on the side where your cardinality is >1 (in this case, the "server").

My approach would be structured something like...

# server.py :

    import zmq
    from multiprocessing import Process

    class Producer (Process):
    
      def init(self):
        ...
    
      def run(self):
        ctx = zmq.Context(1)
        sock = zmq.Socket(ctx, zmq.PUB)
        # Multiple producers, so connect instead of bind (consumer must bind)
        sock.connect('ipc://test')
        while True:
          ...
    
    if __name__ == "__main__":
      producer = Producer()
      p = Process(target=producer.run)
      p.start()
      p.join()

# client.py :

    import zmq

    ctx = zmq.Context(1)
    sock = zmq.Socket(ctx, zmq.SUB)
    # Capture from multiple producers, so bind (producers must connect)
    sock.bind('ipc://test')
    sock.setsockopt_string(zmq.SUBSCRIBE, '') 
    while True:
      print(sock.recv())

Milfordmilhaud answered 11/8, 2020 at 4:14 Comment(2)
Sorry, this will never fly - your promise to create a working solution "within each subprocess" is wrong and will fail on colliding into the already owned resource ( each subsequent .bind() will reject to complete successfully, once 1st succeeds to own the address ).Graeme
You are correct; I overlooked the bind/connect direction in my focus on the context and socket. I will update my answer.Milfordmilhaud
G
2

Q : "Why ZeroMQ fails to communicate when I use multiprocessing.Process to run?"

Well, ZeroMQ does not fail to communicate, the problem is, how Python multiprocessing module "operates".

The module is designed so that some processing may escape from the python central GIL-lock (re-[SERIAL]-iser, that is used as a forever present [CONCURRENT]-situations' principal avoider).

This means that the call to the multiprocessing.Process makes one exact "mirror-copy" of the python interpreter state, "exported" into new O/S-spawned process (details depend on localhost O/S).

Given that, there is zero chance a "mirror"-ed replica could get access to resources already owned by the __main__ - here the .bind()-method already acquired ipc://test address, so "remote"-process will never get "permission" to touch this ZeroMQ AccessPoint, unless the code gets repaired & fully re-factored.

Q : "Can anyone explain on this and provide some advice?"

Sure. The best step to start is to fully understand Pythonic culture of monopolistic GIL-lock re-[SERIAL]-isation, where no two things ever happen in the same time, so even adding more threads does not speed-up the flow of the processing, as it all gets re-aligned by the central "monopolist" The GIL-lock.

Next, understanding a promise of a fully reflected copy of the python interpreter state, while it sounds promising, also has some obvious drawbacks - the new processes, being "mirror"-copies cannot introduce colliding cases on already owned resources. If they try to, a not working as expected cases are the milder of the problems in such principally ill-designed cases.

In your code, the first row in __main__ instantiates a = A(), where A's .__init__ method straight occupies the IPC-resource since .bind('ipc://test'). The later step, p = Process( target = A.run, args = ( sock, ) ) "mirror"-replicates the state of the python interpreter (an as-is copy) and the p.start() cannot but crash into disability to "own" the same resource as the __main__ already owns (yes, the ipc://test for a "mirror"-ed process instructed call to grab the same, non-free resource in .bind('ipc://test') ). This will never fly.

Last but not least, enjoy the Zen-of-Zero, the masterpiece of Martin SUSTRIK for , so well crafted for ultimately scalable, almost zero-latency, very comfortable, widely ported signalling & messaging framework.

Graeme answered 10/8, 2020 at 8:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.