Django concurrency: can't create data for testing in async function?
Asked Answered
T

0

1
  • Goal Test async function with test database
  • Problem I have async functions for Django channels and when I create data in setUp the data are not reflected in the project!!!
  • My tests: note: these tests inside a APITestCase class

class WebsocketTests(APITestCase):
       

    async def test_connect(self):
        userx = await sync_to_async(User.objects.create)(username='newusername',email='[email protected]')
        users_number = await sync_to_async(User.objects.count)()
        print(users_number) #<=== this returns 1 just fine
        communicator = WebsocketCommunicator(application, 'alerts/')
        connected, subprotocol = await communicator.connect()
        assert connected
        await communicator.disconnect()

@database_sync_to_async
def get_user(querys):
    print(User.objects.count()) #<======= this returns 0 while it should return 1

    try:
        token = parse_qs(querys.decode("utf8"))['token'][0]
        token_data = UntypedToken(token)
        user_id = token_data["user_id"]
    except:
        return 'token is invalid'

    try:
        return User.objects.get(id=user_id)
    except User.DoesNotExist:
        return 'AnonymousUser'
class QueryAuthMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        scope['user'] = await get_user(scope["query_string"])
        return await self.app(scope, receive, send)


application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": QueryAuthMiddleware(
        URLRouter(
            websocket_urlpatterns
        )
    ),
})

#settings.py
DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.sqlite3',
            'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
            'OPTIONS': {
                'timeout': 20,  # in seconds
                # see also
                # https://docs.python.org/3.7/library/sqlite3.html#sqlite3.connect
            },
            'TEST': {
                'NAME': os.path.join(BASE_DIR, "db_test.sqlite3"),
            },

        }
    }
  • I tride
    1. replace APITestCase with django TestCase
    2. switch to postgres
    3. use from asgiref.sync import sync_to_async
Tavares answered 30/6, 2021 at 7:39 Comment(4)
why no one answer about async topcics?Tavares
You asked the question 1 hour ago, then 55 minutes ago got annoyed because nobody had answered?? This entire community is async - you need to give it more time than that! I'd help, but have little experience with either websockets or async, so not sure where the problem lies here.Erroneous
@michjich this is not the first django channels quastion, I asked two before too long time and no one answered also when I search there is no much information about testing and advanced. Channels topicsTavares
I have no experience with async so to say, but I would try making the setUp method async too. Also no need to get frustated, async topics receive lesser answers because well there are lesser experts in those topics.Shouse

© 2022 - 2024 — McMap. All rights reserved.