How to implement timeout in asyncio server?
Asked Answered
F

2

6

Below is a simple echo server. But if the client does not send anything for 10 seconds, I want to close the connection.

import asyncio


async def process(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    print("awaiting for data")
    line = await reader.readline()
    print(f"received {line}")
    writer.write(line)
    print(f"sent {line}")
    await writer.drain()
    print(f"Drained")


async def timeout(task: asyncio.Task, duration):
    print("timeout started")
    await asyncio.sleep(duration)
    print("client unresponsive, cancelling")
    task.cancel()
    print("task cancelled")


async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    await task
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")


async def a_main():
    server = await asyncio.start_server(new_session, port=8088)
    await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(a_main())

If the client sends a message, it works fine. But the other case, when client is silent, it does not work

When client sends message:

new session started
awaiting for data
timeout started
received b'slkdfjsdlkfj\r\n'
sent b'slkdfjsdlkfj\r\n'
Drained
task complete
timer cancelled
writer closed

When client is silent after opening connection

new session started
awaiting for data
timeout started
client unresponsive, cancelling
task cancelled

There is no task complete, timer cancelled, writer closed.

  1. What is the issue with above code?
  2. Is there a better way to implement timeouts?

Update

Figured out the problem, Looks like the task was actually cancelled, but the exception got silently ignored, Fixed the problem by catching CancelledError

async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    try:
        await task
    except asyncio.CancelledError:
        print(f"Task took too long and was cancelled by timer")
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")

Second part still remains. Is there a better way to implement timeouts?


Update2

Complete code using wait_for. The timeout code is no longer needed. Check accepted solution below:

async def new_session(reader, writer):
    print("new session started")
    try:
        await asyncio.wait_for(process(reader, writer), timeout=5)
    except asyncio.TimeoutError as te:
        print(f'time is up!{te}')
    finally:
        writer.close()
        print("writer closed")
Flexuous answered 27/12, 2018 at 20:9 Comment(2)
You can use asyncio.wait_for instead of timeout.Excommunicative
@Excommunicative Thanks. Do you want to add it as an answer? I will accept.Flexuous
E
4

Is there a better way to implement timeouts?

You can use asyncio.wait_for instead of timeout. It has similar semantics, but already comes with asyncio. Also, you can await the future it returns to detect if the timeout has occurred.

Excommunicative answered 28/12, 2018 at 15:6 Comment(0)
A
11

I use the following code when making a connection. I'd suggest using wait_for similarly for your code.

fut = asyncio.open_connection( self.host, self.port, loop=self.loop )
try:
   r, w = await asyncio.wait_for(fut, timeout=self.connection_timeout)
except asyncio.TimeoutError:
   pass
Aalto answered 27/12, 2018 at 22:17 Comment(3)
open_connection is when you are a client. The question is for server.Flexuous
It is an example of how to use wait_for which is the pythonic solution for what you want to do.Aalto
I didn't realise it can be used for any awaitable. Upvoted.Flexuous
E
4

Is there a better way to implement timeouts?

You can use asyncio.wait_for instead of timeout. It has similar semantics, but already comes with asyncio. Also, you can await the future it returns to detect if the timeout has occurred.

Excommunicative answered 28/12, 2018 at 15:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.