How I can reuse aiohttp.ClientSession?
Asked Answered
A

2

7

I am building own async python package and faced the problem.

This is my code:

class Client:
    """
    Async client for making requests
    """

    def __init__(self, base_url: str = BASE_URL) -> None:
        self.base_url = base_url
        self.session = ClientSession()

    async def get(self, method: str, *args: tp.Any, **kwargs: tp.Any) -> tp.Any:
        async with self.session.get(f'{self.base_url}/{method}', *args, **kwargs) as response:
            data = await response.json()
            return data

When I try to use something like this:

await client.get()

I get

RuntimeError: Timeout context manager should be used inside a task

I suppose that the reason of this error is calling ClientSession() not inside the coroutine. But I hope that somebody knows the way to re-use ClientSession()

I have already read other similar questions, but they are not suitable to my situation.

Allergen answered 10/2, 2021 at 12:56 Comment(0)
O
4

I ran into this issue a little while ago. I tried storing the aiohttp.ClientSession in the init function, but whenever I tried to use it, I got RuntimeError: Timeout context manager should be used inside a task.

This is OK, but the trick is to do it right. The aiohttp.ClientSession must be initialized inside a task, meaning an async function. The init function is not async, however it is still OK, as long as any parent function is async. Let me clarify with some code snippets.

import aiohttp
import asyncio


class MyClass:
    def __init__(self) -> None:
        self._session = aiohttp.ClientSession()

    async def make_http_get_request(self, url: str) -> str:
        async with self._session.request("GET", url) as response:
            response.raise_for_status()
            return await response.text()


def main() -> None:
    my_class = MyClass()
    html = asyncio.run(my_class.make_http_get_request("http://github.com/"))
    print(html)


if __name__ == "__main__":
    main()

This code will result in a RuntimeError: Timeout context manager should be used inside a task because the MyClass, and it's aiohttp.ClientSession is initialized outside of a task. The correct way would be:

import aiohttp
import asyncio


class MyClass:
    def __init__(self) -> None:
        self._session = aiohttp.ClientSession()

    async def make_http_get_request(self, url: str) -> str:
        async with self._session.request("GET", url) as response:
            response.raise_for_status()
            return await response.text()


async def main() -> None:
    my_class = MyClass()  # This is now being initilized inside a task (async function)
    html = await my_class.make_http_get_request("http://github.com/")
    print(html)


if __name__ == "__main__":
    asyncio.run(main())

Of course, you will still get some warnings like Unclosed client session and Unclosed connector with code like this, because you are not closing your resources properly. Let's write better code to take care of that:

import aiohttp
import asyncio


class MyClass:
    def __init__(self) -> None:
        self._session = aiohttp.ClientSession()

    async def close(self) -> None:
        await self._session.close()

    async def make_http_get_request(self, url: str) -> str:
        async with self._session.request("GET", url) as response:
            response.raise_for_status()
            return await response.text()


async def main() -> None:
    my_class = MyClass()
    html = await my_class.make_http_get_request("http://github.com/")
    print(html)

    await my_class.close()


if __name__ == "__main__":
    asyncio.run(main())

However, this would still leave us with these pesky warnings if we for some reason crash, due to some unforeseen exception. We can make this better yet, using context managers:

import aiohttp
import asyncio
from traceback import TracebackException
from types import TracebackType


class MyClass:
    def __init__(self) -> None:
        self._session = aiohttp.ClientSession()

    async def __aenter__(self) -> "MyClass":
        return self

    async def __aexit__(
        self,
        exc_type: Exception,
        exc_val: TracebackException,
        traceback: TracebackType,
    ) -> None:
        await self.close()

    async def close(self) -> None:
        await self._session.close()

    async def make_http_get_request(self, url: str) -> str:
        async with self._session.request("GET", url) as response:
            response.raise_for_status()
            return await response.text()


async def main() -> None:
    async with MyClass() as my_class:
        html = await my_class.make_http_get_request("http://github.com/")
        print(html)


if __name__ == "__main__":
    asyncio.run(main())

I hope this clears things up.

Ozonize answered 28/6, 2023 at 17:5 Comment(1)
It's great that you figured it out how to do it. But doesn't you solution proves that it does not make sense to use singleton on an async client? The topic of this question is if we can reuse the aiohttp's client session. And because we should close the session after using it, or as in your example, use the context manager - then we cannot reuse the client session anymore. Though there could be the need of hiding away some complex configuration of the client and thus saving on code in the main program - only then it would bring advantage, isn't?Eolic
D
1

You can initialize (and cache) the session when needed:

class Client:
    """
    Async client for making requests
    """

    def __init__(self, base_url: str = BASE_URL) -> None:
        self.base_url = base_url
        self.session = None

    async def get(self, method: str, *args: tp.Any, **kwargs: tp.Any) -> tp.Any:
        if not self.session:
            self.session = ClientSession()
        async with self.session.get(f'{self.base_url}/{method}', *args, **kwargs) as response:
            data = await response.json()
            return data

Depending on how you use the Client you can also use a class attribute for the session object.

Update:

ClientSession creation should be protected from race condition using Mutex:

_session_mutex = asyncio.Lock()

async def __create_session_if_required(self):
    if self.session is None:
        async with self._session_mutex:
            if self.session is None:
                self.session = aiohttp.ClientSession()
                # should be closed if not Singleton class: "await session.close()"

async def get(..):
    await self.__create_session_if_required()
    async with self.session.get() as response:
        # ...

Displease answered 14/2, 2021 at 16:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.