How can I add a connection timeout with asyncio?
Asked Answered
D

2

24

I want to connect to a list of a lot of different sites very fast. Im using asyncio to do this in an asynchronous manner and now want to add a timeout for when connections should be ignored if they take too long to respond.

How do I implement this?

import ssl
import asyncio
from contextlib import suppress
from concurrent.futures import ThreadPoolExecutor
import time


@asyncio.coroutine
def run():
    while True:
        host = yield from q.get()
        if not host:
            break

        with suppress(ssl.CertificateError):
            reader, writer = yield from asyncio.open_connection(host[1], 443, ssl=True) #timout option?
            reader.close()
            writer.close()


@asyncio.coroutine
def load_q():
    # only 3 entries for debugging reasons
    for host in [[1, 'python.org'], [2, 'qq.com'], [3, 'google.com']]:
        yield from q.put(host)
    for _ in range(NUM):
        q.put(None)


if __name__ == "__main__":
    NUM = 1000
    q = asyncio.Queue()

    loop = asyncio.get_event_loop()
    loop.set_default_executor(ThreadPoolExecutor(NUM))

    start = time.time()
    coros = [asyncio.async(run()) for i in range(NUM)]
    loop.run_until_complete(load_q())
    loop.run_until_complete(asyncio.wait(coros))
    end = time.time()
    print(end-start)

(On a sidenote: Has somebody an idea how to optimize this?)

Decoder answered 20/4, 2015 at 19:17 Comment(2)
You forgot to yield from the calls to q.put(None) inside load_q, so this code won't work as currently written.Topic
you don't need reader,writer here. You could use asyncio.create_connection with Protocol that does nothing (it closes the network connection as soon as it is established). Here's code example that I've tried on top million Alexa site list (it might be slightly outdated e.g., it doesn't use some convience functions such as asyncio.wait_for()). It uses a single thread and opens upto limit ssl connections.Nicolais
T
29

You can wrap the call to open_connection in asyncio.wait_for, which allows you to specify a timeout:

    with suppress(ssl.CertificateError):
        fut = asyncio.open_connection(host[1], 443, ssl=True)
        try:
            # Wait for 3 seconds, then raise TimeoutError
            reader, writer = yield from asyncio.wait_for(fut, timeout=3)
        except asyncio.TimeoutError:
            print("Timeout, skipping {}".format(host[1]))
            continue

Note that when TimeoutError is raised, the open_connection coroutine is also cancelled. If you don't want it to be cancelled (though I think you do want it to be cancelled in this case), you have wrap the call in asyncio.shield.

Topic answered 20/4, 2015 at 19:37 Comment(4)
but this will also make it a blocking call no? Like opening connections in normal loop one after other.Abbreviation
@ali No, because all the calls to the run method are wrapped in an asyncio.async call, which means they all run concurrently.Topic
If the connection timeout needs to be inside another coroutine, see [#28610034 asyncio force timeout) about stacking asyncio.ensure_future(asyncio.wait_for(create_connection()))Nymphet
I'm pretty sure this stopped working with 3.7 because of this change mentioned in the wait_for doc- Changed in version 3.7: When aw is cancelled due to a timeout, wait_for waits for aw to be cancelled. Previously, it raised asyncio.TimeoutError immediately.Ozan
S
0

Here is a similar snippet I have, tested with Python 3.11 and 3.12. It emits a "keepalive" rather than timing out, but you can remove the while True to do the same thing.

T = TypeVar('T')
U = TypeVar('U')


async def emit_keepalive_chunks(
        underlying: AsyncIterator[U],
        timeout: float | None,
        sentinel: T,
) -> AsyncIterator[U | T]:
    # Emit an initial keepalive, in case our async chunks are enormous
    yield sentinel

    maybe_next: asyncio.Future[U] | None = None

    try:
        maybe_next = asyncio.ensure_future(underlying.__anext__())
        while True:
            try:
                yield await asyncio.wait_for(asyncio.shield(maybe_next), timeout)
                maybe_next = asyncio.ensure_future(underlying.__anext__())
            except asyncio.TimeoutError:
                yield sentinel

    except StopAsyncIteration:
        pass

    finally:
        if maybe_next is not None:
            maybe_next.cancel()

Technical notes:

  • I think the change from Python 3.7 (mentioned in the other answer) to make this work is adding asyncio.shield.
  • I think the type hint for the output isn't strictly correct (this is an AsyncGenerator?)
Shortcake answered 23/6 at 21:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.