Problem Statement
After booting the GUnicorn worker processes, I want the worker processes still be able to receive data from another process. Currently, I'm trying to use multiprocessing.Queue
to achieve this. Specifically, I start a data management process before forking the workers and use two queues to connect it with the workers. One queue is for the workers to request data from the data management process, the other to receive the data. In the post_fork
hook, a worker sends out a request to the request queue and receives a response on the response queue, and only then proceeds to serving the application.
This works fine at first. However, when I manually terminate the workers and gunicorn restarts it, it will get stuck in the post_fork
method and never receive a response from the data management process.
Minimal Example
The following code shows a minimal example (config.py
):
import logging
import os
import multiprocessing
logging.basicConfig(level=logging.INFO)
bind = "localhost:8080"
workers = 1
def s(req_q: multiprocessing.Queue, resp_q: multiprocessing.Queue):
while True:
logging.info("Waiting for messages")
other_pid = req_q.get()
logging.info("Got a message from %d", other_pid)
resp_q.put(os.getpid())
m = multiprocessing.Manager()
q1 = m.Queue()
q2 = m.Queue()
proc = multiprocessing.Process(target=s, args=(q1, q2), daemon=True)
proc.start()
def post_fork(server, worker):
logging.info("Sending request")
q1.put(os.getpid())
logging.info("Request sent")
other_pid = q2.get()
logging.info("Got response from %d", other_pid)
My application module (app.py
) is:
from flask import Flask
app = Flask(__name__)
And I start the server via
$ gunicorn -c config.py app:app
INFO:root:Waiting for messages
[2023-01-31 14:20:46 +0800] [24553] [INFO] Starting gunicorn 20.1.0
[2023-01-31 14:20:46 +0800] [24553] [INFO] Listening at: http://127.0.0.1:8080 (24553)
[2023-01-31 14:20:46 +0800] [24553] [INFO] Using worker: sync
[2023-01-31 14:20:46 +0800] [24580] [INFO] Booting worker with pid: 24580
INFO:root:Sending request
INFO:root:Request sent
INFO:root:Got a message from 24580
INFO:root:Waiting for messages
INFO:root:Got response from 24574
The log shows that the messages were successfully exchanged. Now, we'll stop the worker process and let gunicorn restart it:
$ kill 24580
[2023-01-31 14:22:40 +0800] [24580] [INFO] Worker exiting (pid: 24580)
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/util.py", line 319, in _exit_function
p.join()
File "/usr/lib/python3.6/multiprocessing/process.py", line 122, in join
assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process
[2023-01-31 14:22:40 +0800] [24553] [WARNING] Worker with pid 24574 was terminated due to signal 15
[2023-01-31 14:22:40 +0800] [29497] [INFO] Booting worker with pid: 29497
INFO:root:Sending request
INFO:root:Request sent
Question
Why doesn't s
receive the message from the worker after re-starting?
Besides, why am I getting this 'can only join a child process' error thrown? Does it has something to do with the problem?
Environment
- Python: 3.8.0
- GUnicorn: 20.1.0
- OS: Ubuntu 18.04
Related Questions
In this question, a similar problem is presented, and the solution was to use "multiprocessing.manager.queue". However, this didn't solved the issue in my case.
Side Note
I already considered the following alternative designs:
- Use HTTP/gRPC/... to share the data: The data that I need to share isn't serializable
- Use
threading.Thread
instead ofmultiprocessing.Process
for the data management process: The data management process initializes an object that will throw an error when it is forked, so I cannot initialize this object within the GUnicorn master process.