Future task attached to a different loop
Asked Answered
W

3

15

I am trying to connect to mongodb in FastAPI. I am repeatedly getting this exception.

File - main.py

app = FastAPI(
    title=config.PROJECT_NAME, docs_url="/api/docs", openapi_url="/api"
)

@app.get("/api/testing")
async def testit():
    user_collection = readernetwork_db.get_collection("user_collection")
    all_users = await user_collection.find_one({"email": "sample_email"})
    print("all users --- ", all_users)
    return all_users

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", reload=True, port=8888)

File - session.py

import motor.motor_asyncio
from app.core import config


print("here we go again....")
client = motor.motor_asyncio.AsyncIOMotorClient(
    config.MONGOATLAS_DATABASE_URI)
readernetwork_db = client.get_database("readernetwork")

Exception -:

all_users = await user_collection.find_one({"email": "sample_email"})

RuntimeError: Task <Task pending name='Task-4' coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py:389> cb=[set.discard()]> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.8/asyncio/futures.py:360]> attached to a different loop

I don't know where I am getting this wrong. Should I specify a event loop to motor?

Wyant answered 2/1, 2021 at 17:35 Comment(0)
D
10

You can have mongodb motor client in the global scope, but creating and closing it should be done inside an async function. The most preferable way of doing that in startup and shutdown handler of the application. Like so:

# mongodb.py
from motor.motor_asyncio import AsyncIOMotorClient


db_client: AsyncIOMotorClient = None


async def get_db_client() -> AsyncIOMotorClient:
    """Return database client instance."""
    return db_client


async def connect_db():
    """Create database connection."""
    global db_client
    db_client = AsyncIOMotorClient(DB_URL)

async def close_db():
    """Close database connection."""
    db_client.close()
# main.py
app = FastAPI(title=PROJECT_NAME)
...
app.add_event_handler("startup", connect_db)
app.add_event_handler("shutdown", close_db)

Note that you need the line global db_client to modify the global variable defined beforehand.

Dizzy answered 3/1, 2021 at 9:35 Comment(6)
Maybe I should ask a new question, but I find this problem to come back when I use fastapi, motor and fastapi-users since fastapi-users requires something like user_db = MongoDBUserDatabase(UserDB, db["users"]). How can we setup user_db in the app.add_event_handler lines?Remains
This was super helpful. We had a file that initialized a MongoDB connection (using motor, via ODMantic) at the package level. Putting the initialization in an async function - and then calling that function to retrieve a database connection - fixed our issue. Thank you, @alex_noname.Dryasdust
This was very helpful. One note - declaring db_client globally in any file didn't play nicely on my system (I'd get "None has no method get_database()") - instead, just combining connect_db and get_db into one function did the trick: async def get_db_client() -> AsyncIOMotorClient: """Return database client instance.""" db_client = AsyncIOMotorClient(settings.MONGO_URI) return db_clientLenz
@Dryasdust Can you share an example of how did you accomplish that? I'm on the same scenario.Unharness
"You can have mongodb motor client in the global scope, but creating and closing it should be done inside an async function" Where does this claim come from ?Irregularity
@AndréCosta, at the global scope for the file, we have _engine: Optional[AIOEngine] = None. Then we have this function: async def get_engine() -> AIOEngine:. In this function, we do global engine and then check if _engine. If that passes, it's returned. Otherwise, it's created and returned. We also have a function called set_engine_connection(conn: str, db: str): that allows overriding the current _engine to be connected to a test database (which is then called when bootstrapping integration tests).Dryasdust
T
2

Without the need for using global... You can access the application state from the request given to any route.

#main.py
async def open_db() -> AsyncIOMotorClient:
    app.state.mongodb = AsyncIOMotorClient(DB_URL)

async def close_db():
    app.state.mongodb.close()

app.add_event_handler('startup', open_db)
app.add_event_handler('shutdown', close_db)

Within each request to a given route you will have access to the app state. For example,

@app.route('/{username}')
async def index(request: Request, username: str):
    user = await request.app.state.mongodb['auth']['users'].find_one({"username" : username})

You can even make it easier by doing something like this in your open_db function. Specify a state value like 'users' to be a specific Collections instance.

async def open_db() -> AsyncIOMotorClient:
    app.state.mongodb = AsyncIOMotorClient(DB_URL)
    app.state.users = app.state.mongodb['auth']['users']

Now you can just do this,

@app.route('/{username}')
async def index(request: Request, username: str):
    user = await request.app.state.users.find_one({"username" : username})
Tripoli answered 16/2, 2022 at 17:21 Comment(0)
V
2
client = AsyncIOMotorClient()
client.get_io_loop = asyncio.get_event_loop

works for me.

Vanderpool answered 18/8, 2022 at 1:26 Comment(1)
My problem was raising async subscribers with pubsub in different threads inside the same kubernetes pod, and this was the solution for me too.Withindoors

© 2022 - 2025 — McMap. All rights reserved.