Django - working example of graphene aiodataloader
Asked Answered
K

1

6

I've been looking into using a DataLoader in Django to optimise Graphene resolvers. I have been having trouble finding a working example of this as it requires multi threading. Based on what I can tell, there is no longer a way to specify executors in the latest versions of GraphQL in python. Does anyone have a working setup as an example?

I was following the example below, and everytime I use async for my resolvers, I get:

RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'.

Which I think makes sense because GraphQL is using a sync executor by default (I stand corrected). I got this example from: https://docs.graphene-python.org/en/latest/execution/dataloader/

Source code:

dataloaders.py

from dataloader import DataLoader

class UserDataLoader(DataLoader):
    async def batch_load_fn(self, keys):
        pass

class PostDataLoader(DataLoader):
    async def batch_load_fn(self, keys):
        pass

graphql.py

import graphene
from graphene_django.types import DjangoObjectType
from .models import User, Post
from .dataloaders import UserDataLoader, PostDataLoader

class UserType(DjangoObjectType):
    class Meta:
        model = User

class PostType(DjangoObjectType):
    class Meta:
        model = Post

class Query(graphene.ObjectType):
    user = graphene.Field(UserType, id=graphene.Int())
    post = graphene.Field(PostType, id=graphene.Int())

    async def resolve_user(self, info, id):
        user_data_loader = UserDataLoader(info.context)
        return await user_data_loader.load(id)

    async def resolve_post(self, info, id):
        post_data_loader = PostDataLoader(info.context)
        return await post_data_loader.load(id)

schema = graphene.Schema(query=Query)

view.py

from graphene import BatchingExecutor # <- Does not exist in the library anymore
from graphene_django.views import GraphQLView

class NewDataLoaderGraphQLView(GraphQLView):
    executor = BatchingExecutor()

    def get_context(self, request):
        # Create new data loader instances for each request
        context = super().get_context(request)
        context.user_data_loader = UserDataLoader(context)
        context.post_data_loader = PostDataLoader(context)
        return context

urls.py

from django.urls import path
from .views import NewDataLoaderGraphQLView

urlpatterns = [
    path("graphql/", NewDataLoaderGraphQLView.as_view(graphiql=True)),
]

I'm running this using

./manage.py runserver 0.0.0.0:80

Note: This is not production code because I'm still testing this.


APPROACH ABANDONED

At this time, there are a lot of inconsistencies in concurrency support for Django + GraphQL. At least I didn't have any luck finding anything reliable. The closest I came was the following library which I am trialing. https://github.com/jkimbo/graphql-sync-dataloaders

Kipp answered 14/8, 2023 at 16:35 Comment(5)
How do you get that error ? Where is the code that caused this issue please add itOestriol
Which version of python are you using ?Oestriol
Python v3.11 and aiodataloader v0.4.0Kipp
please add models tooOestriol
You had also not defined the batch_functionOestriol
A
1

The problem is that DataLoader needs a running event loop, but there is no running event loop when your NewDataLoaderGraphQLView.get_context() method runs. This is because manage.py runserver starts a synchronous development server by default.

But even running Django using daphne doesn't really help, because graphene-django isn't async-aware (yet).

However, I got this to work by using an asyncio.Runner and creating a new event loop for every request:

class BadAsyncDataLoaderGraphQLView(GraphQLView):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.runner = asyncio.Runner()

    def get_context(self, request):
        context = super().get_context(request)
        # this creates a *new* event loop every time get_context() runs :(
        event_loop = self.runner.get_loop()
        context.user_data_loader = UserDataLoader(loop=event_loop)
        context.post_data_loader = PostDataLoader(loop=event_loop)
        return context

    def execute_graphql_request(self, *args, **kwargs):
        with self.runner:
            result = super().execute_graphql_request(*args, **kwargs)

            if hasattr(result, '__await__'):
                async def wait_for(value):
                    return await value

                result = self.runner.run(wait_for(result))

            return result

Again, this creates a new event loop for every request! So I wouldn't run this in production.

Your best option probably is to wait for #1394 getting merged. Or to continue using graphql-sync-dataloaders...

Aircrew answered 1/7 at 9:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.