Could not use os.fork() bind several process to one socket server when using asyncio
Asked Answered
T

2

2

We all know that using asyncio substantially improves the performance of a socket server, and obviously things get even more awesome if we could take advantage of all cores in our cpu (maybe via multiprocessing module or os.fork() etc.)

I'm now trying to build a multicore socket server demo, with a asynchronous socket server listening on each core and all binding to one port. Simply by creating a async server and then use os.fork(), let processes work competitively.

However the single-core-fine code runs into some trouble when I'm trying to fork. Seems like there's some problem with registering same filedescriptors from different processes in epoll selector module.

I'm showing some code below, can anyone help me out?


Here's a simple, logically clear code of echo server using asyncio:

import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

It works fine until I'm trying to fork, after the socket was bounl and before server starts serving. (This logic works fine in synchronous -- threading based code.)


When I'm trying this:

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()

Theoretically forked process are bounl to a same socket? And run in a same event loop? then work just fine?

However I'm getting these error messages:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python version 3.7.3,

I'm totally confused about what's going on.

Could anybody help? thanks

Tyndall answered 1/6, 2019 at 18:49 Comment(2)
this is a known issue of event loop: bugs.python.org/issue21998Isa
Thanks ,seems like things going wrong cause i'm sharing eventloop between process .So if there's any other method we can benefit from multi core to build a socket server in Python?Tyndall
P
1

According to the tracker issue, it is not supported to fork an existing asyncio event loop and attempt to use it from multiple processes. However, according to Yury's comment on the same issue, multi-processing can be implemented by forking before starting a loop, therefore running fully independent asyncio loops in each child.

Your code actually confirms this possibility: while create_server is async def, it doesn't await anything, nor does it use the loop argument. So we can implement Yury's approach by by making create_server a regular function, removing the loop argument, and calling it before os.fork(), and only running event loops after forking:

import os, asyncio, socket, multiprocessing

async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
def create_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client, addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop, client))

sock = create_server()

for num in range(multiprocessing.cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()
Polyphone answered 1/6, 2019 at 20:16 Comment(5)
Thanks,perfectly solved. Since pypy 7.1.1 fully supports asyncio ,I got a benchmark of about 800k echo per second from my eight core platform ,Fabulous. But regretfully pypy gets some trouble switching eventloop into uvloop policyTyndall
Don't know if it's caused by I'm doing something wrong.Tyndall
@LeeRoermond It's hard to tell how debugged the asyncio/uvloop combination is. But to be on the safe side with what you're doing, I'd move even the import of asyncio (and uvloop and anything gtk-related) until after the call to fork(). In other words, not only should the parent should not run the event loop, but it shouldn't even import asyncio/uvloop.Polyphone
thanks for reply. I've tested latest version of uvloop on pypy (installed via pip),and seems like it couldn't even drive a single process echo server. Thus it's not some namespace or import problem as far as i'm concerned. Building from source was also failed cause by some libev issue. pity ╮(╯_╰)╭Tyndall
@LeeRoermond uvloop uses Cython and C extensions heavily, which, while nominally supported by PyPy, tends not to work well in practice, and hinders the JIT's ability to reason about the code. When using PyPy, I'd definitely stay with the built-in asyncio.Polyphone
T
1

I just made a quick test, and this works how I would expect it to:

import asyncio
import os
import time
import socket
from pathlib import Path

async def other():
    while True:
       await asyncio.sleep(1)
       print("other {}".format(os.getuid()))

async def f():
    pid = os.fork()
    if pid == 0:
        loop = asyncio.new_event_loop()
        os.setuid(1000)
        server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        Path("/tmp/sock.whatever").unlink(missing_ok = True)
        server.bind("/tmp/sock.whatever")
        server.listen(1)
        while True:
            loop.run_until_complete(asyncio.sleep(1))
            print("child {} {}".format(os.getuid(), pid))
            connection, client_address = server.accept()
            data = connection.recv(1024)
            print('{} {} Received data: {}'.format(os.getuid(), pid, data.decode()))
    else:
        while True:
            print("parent {} {}".format(os.getuid(), pid))
            if Path("/tmp/sock.whatever").exists():
                client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
                try:
                    client.connect("/tmp/sock.whatever")
                    message = 'foobar works {} {}'.format(os.getuid(), pid)
                    client.sendall(message.encode())
                    client.close()
                except:
                    pass
            await asyncio.sleep(1)

async def main():
    async with asyncio.TaskGroup() as tg:
        await asyncio.gather(tg.create_task(other()), tg.create_task(f()))

loop = asyncio.new_event_loop()
loop.run_until_complete(main())

create a script with these contents, name it fork.py and run it with sudo:

sudo python fork.py
parent 0 591480
other 0
parent 0 591480
child 1000 0
1000 0 Received data: foobar works 0 591480
other 0
child 1000 0
parent 0 591480
1000 0 Received data: foobar works 0 591480
other 0
parent 0 591480
child 1000 0
1000 0 Received data: foobar works 0 591480
other 0
parent 0 591480
child 1000 0
1000 0 Received data: foobar works 0 591480

As far as I know, you don't want to overwrite your parent loop, but you do have to create a new loop in the child and it has to have it's own event loop (based on what I've been able to gather on the subject.)

Thousandth answered 25/12, 2023 at 9:34 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.