Python asyncio Protocol behaviour with multiple clients and infinite loop
Asked Answered
C

1

7

I'm having difficulty understanding the behaviour of my altered echo server, which attempts to take advantage of python 3's asyncio module.

Essentially I have an infinite loop (lets say I want to stream some data from the server to the client indefinitely whilst the connection has been made) e.g. MyServer.py:

#! /usr/bin/python3
import asyncio
import os
import time

class MyProtocol(asyncio.Protocol):

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def connection_lost(self, exc):
        asyncio.get_event_loop().stop()

    def data_received(self, data):
        i = 0
        while True:
            self.transport.write(b'>> %i' %i)
            time.sleep(2)
            i+=1

loop = asyncio.get_event_loop()
coro = loop.create_server(MyProtocol, 
    os.environ.get('MY_SERVICE_ADDRESS', 'localhost'), 
    os.environ.get('MY_SERVICE_PORT', 8100))
server = loop.run_until_complete(coro)

try:
    loop.run_forever()
except:
    loop.run_until_complete(server.wait_closed())
finally:
    loop.close()

Next when I connect with nc ::1 8100 and send some text (e.g. "testing") I get the following:

user@machine$ nc ::1 8100
*** Connection from('::1', 58503, 0, 0) ***
testing
>> 1
>> 2
>> 3
^C

Now when I attempt to connect using nc again, I do not get any welcome message and after I attempt to send some new text to the server I get an endless stream of the following error:

user@machine$ nc ::1 8100
Is there anybody out there?
socket.send() raised exception
socket.send() raised exception
...
^C

Just to add salt to the wound the socket.send() raised exception message continues to spam my terminal until I kill the python server process...

As I'm new to web technologies (been a desktop dinosaur for far too long!), I'm not sure why I am getting the above behaviour and I haven't got a clue on how to produce the intended behaviour, which loosely looks like this:

  1. server starts
  2. client 1 connects to server
  3. server sends welcome message to client 4 client 1 sends an arbitrary message
  4. server sends messages back to client 1 for as long as the client is connected
  5. client 1 disconnects (lets say the cable is pulled out)
  6. client 2 connects to server
  7. Repeat steps 3-6 for client 2

Any enlightenment would be extremely welcome!

Chromato answered 22/6, 2018 at 13:48 Comment(0)
P
14

There are multiple problems with the code.

First and foremost, data_received never returns. At the transport/protocol level, asyncio programming is single-threaded and callback-based. Application code is scattered across callbacks like data_received, and the event loop runs the show, monitoring file descriptors and invoking the callbacks as needed. Each callback is only allowed to perform a short calculation, invoke methods on transport, and arrange for further callbacks to be executed. What the callback cannot do is take a lot of time to complete or block waiting for something. A while loop that never exits is especially bad because it doesn't allow the event loop to run at all.

This is why the code only spits out exceptions once the client disconnects: connection_lost is never called. It's supposed to be called by the event loop, and the never-returning data_received is not giving the event loop a chance to resume. With the event loop blocked, the program is unable to respond to other clients, and data_received keeps trying to send data to the disconnected client, and logs its failure to do so.

The correct way to express the idea can look like this:

def data_received(self, data):
    self.i = 0
    loop.call_soon(self.write_to_client)

def write_to_client(self):
    self.transport.write(b'>> %i' % self.i)
    self.i += 1
    loop.call_later(2, self.write_to_client)

Note how both data_received and write_to_client do very little work and quickly return. No calls to time.sleep(), and definitely no infinite loops - the "loop" is hidden inside the kind-of-recursive call to write_to_client.

This change reveals the second problem in the code. Its MyProtocol.connection_lost stops the whole event loop and exits the program. This renders the program unable to respond to the second client. The fix could be to replace loop.stop() with setting a flag in connection_lost:

def data_received(self, data):
    self._done = False
    self.i = 0
    loop.call_soon(self.write_to_client)

def write_to_client(self):
    if self._done:
        return
    self.transport.write(b'>> %i' % self.i)
    self.i += 1
    loop.call_later(2, self.write_to_client)

def connection_lost(self, exc):
    self._done = True

This allows multiple clients to connect.


Unrelated to the above issues, the callback-based code is a bit tiresome to write, especially when taking into account complicated code paths and exception handling. (Imagine trying to express nested loops with callbacks, or propagating an exception occurring inside a deeply embedded callback.) asyncio supports coroutines-based streams as alternative to callback-based transports and protocols.

Coroutines allow writing natural-looking code that contains loops and looks like it contains blocking calls, which under the hood are converted into suspension points that enable the event loop to resume. Using streams the code from the question would look like this:

async def talk_to_client(reader, writer):
    peername = writer.get_extra_info('peername')
    print('Connection from {}'.format(peername))

    data = await reader.read(1024)
    i = 0
    while True:
        writer.write(b'>> %i' % i)
        await writer.drain()
        await asyncio.sleep(2)
        i += 1

loop = asyncio.get_event_loop()
coro = asyncio.start_server(talk_to_client, 
    os.environ.get('MY_SERVICE_ADDRESS', 'localhost'), 
    os.environ.get('MY_SERVICE_PORT', 8100))
server = loop.run_until_complete(coro)

loop.run_forever()

talk_to_client looks very much like the original implementation of data_received, but without the drawbacks. At each point where it uses await the event loop is resumed if the data is not available. time.sleep(n) is replaced with await asyncio.sleep(n) which does the equivalent of loop.call_later(n, <resume current coroutine>). Awaiting writer.drain() ensures that the coroutine pauses when the peer cannot process the output it gets, and that it raises an exception when the peer has disconnected.

Pamper answered 22/6, 2018 at 19:57 Comment(1)
Thank you for this very informative (& an excellent) answer! It not only addresses the issues at hand but helps me understand / make better use of asyncio :-)Chromato

© 2022 - 2024 — McMap. All rights reserved.