How to trigger lifespan startup and shutdown while testing FastAPI app?
Asked Answered
S

2

5

Being very new to FastAPI I am strugling to test slightly more difficult code than I saw in the tutorial. I use fastapi_cache module and Redis like this:

from fastapi import Depends, FastAPI, Query, Request
from fastapi_cache.backends.redis import CACHE_KEY, RedisCacheBackend
from fastapi_cache import caches, close_caches

app = FastAPI()

def redis_cache():
    return caches.get(CACHE_KEY)    

@app.get('/cache')
async def test(
    cache: RedisCacheBackend = Depends(redis_cache),
    n: int = Query(
        ..., 
        gt=-1
    )
):  
    # code that uses redis cache

@app.on_event('startup')
async def on_startup() -> None:
    rc = RedisCacheBackend('redis://redis')
    caches.set(CACHE_KEY, rc)

@app.on_event('shutdown')
async def on_shutdown() -> None:
    await close_caches()

test_main.py looks like this:

import pytest
from httpx import AsyncClient
from .main import app

@pytest.mark.asyncio
async def test_cache():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.get("/cache?n=150")

When I run pytest, it sets cache variable to None and test fails. I think I understand why the code is not working. But how do I fix it to test my caching properly?

Slab answered 28/11, 2020 at 16:1 Comment(0)
M
9

The point is that httpx does not implement lifespan protocol and trigger startup event handlers. For this, you need to use LifespanManager.

Install: pip install asgi_lifespan

The code would be like so:

import pytest
from asgi_lifespan import LifespanManager
from httpx import AsyncClient
from .main import app


@pytest.mark.asyncio
async def test_cache():
    async with LifespanManager(app):
        async with AsyncClient(app=app, base_url="http://localhost") as ac:
            response = await ac.get("/cache")

More info here: https://github.com/encode/httpx/issues/350

Moony answered 29/11, 2020 at 14:25 Comment(1)
I am using pytest 8.3.3, there seem to be no pytest.mark.asyncio.Sanitation
N
0

In case if you don't want to add dependency just for a tests, here is the simple implementation for asyncio:

import asyncio


class LifespanWaiter:
    def __init__(self, app):
        self.app = app
        self.startup = asyncio.Future()
        self.shutdown = asyncio.Future()

    async def send(self, obj):
        if obj['type'] == 'lifespan.startup.complete':
            self.startup.set_result(None)

    async def receive(self):
        if self.startup.done():
            await self.shutdown

    async def __aenter__(self):
        asyncio.create_task(
            self.app({"type": "lifespan"}, receive=self.receive, send=self.send)
        )
        await self.startup
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.shutdown.set_result(None)

I use it like this:

@fixture(scope='session')
async def app(test_db_name):
    """Creates app instance and apply migrations.
    """
    DATABASES["db_options"]["database"] = test_db_name

    app = make_app()

    @app.on_event("startup")
    async def db_setup():
        async with app.state.db_pool.acquire() as conn:
            await apply_migrations(conn, 'local')

    async with LifespanWaiter(app):
        yield app


@fixture(scope='session')
async def client(app: FastAPI):
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client
Nf answered 16/2, 2023 at 10:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.