Best practice of testing django-rq ( python-rq ) in Django
Asked Answered
B

8

16

I'll start using django-rq in my project.

Django integration with RQ, a Redis based Python queuing library.

What is the best practice of testing django apps which is using RQ?

For example, if I want to test my app as a black box, after User makes some actions I want to execute all jobs in current Queue, and then check all results in my DB. How can I do it in my django-tests?

Burns answered 1/7, 2012 at 13:27 Comment(2)
You've got some good answers below -- why not accept one? Good luck!Republic
None of the current answer say how to make sure the Redis instance is mocked or separated from the real one (the way Django isolates the test database).Liddie
K
9

I just found django-rq, which allows you to spin up a worker in a test environment that executes any tasks on the queue and then quits.

from django.test impor TestCase
from django_rq import get_worker

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        ...                      # Stuff that init jobs.
        get_worker().work(burst=True)  # Processes all jobs then stop.
        ...                      # Asserts that the job stuff is done.
Kiushu answered 5/9, 2012 at 2:22 Comment(1)
This still uses a real Redis instance for collecting jobs, instead of a mock Redis instance, or a namespaced instance.Liddie
S
6

None of the answers above really solved how to test without having redis installed and using django settings. I found including the following code in the tests does not impact the project itself yet gives everything needed.

The code uses fakeredis to pretend a Redis service is available, set up the connection before RQ Django reads the settings.

By default, fakeredis connections do not share the state (the server) so the connection must be the same. Therefore, it is a singleton object to reuse it.

from fakeredis import FakeStrictRedis, FakeRedis

class FakeRedisConn:
    """Singleton FakeRedis connection."""

    def __init__(self):
        self.conn = None

    def __call__(self, _, strict):
        if not self.conn:
            self.conn = FakeStrictRedis() if strict else FakeRedis()
        return self.conn


django_rq.queues.get_redis_connection = FakeRedisConn()

def test_case():
   ...

FakeRedis has the option to support it directly using FakeRedisConnSingleton:

from fakeredis import FakeRedisConnSingleton

django_rq.queues.get_redis_connection = FakeRedisConnSingleton()
Sicken answered 27/5, 2022 at 22:4 Comment(2)
Add import from fakeredis import FakeStrictRedis, FakeRedisCrandale
@danielm - Thankyou for your contributions! Do you want to edit this comment to clarify that you have added this to the fakeredis codebase, people can import from fakeredis import FakeRedisConnSingleton and then call django_rq.queues.get_redis_connection = FakeRedisConnSingleton()Electuary
L
4

I separated my rq tests into a few pieces.

  1. Test that I'm correctly adding things to the queue (using mocks).
  2. Assume that if something gets added to the queue, it will eventually be processed. (rq's test suite should cover this).
  3. Test, given the correct input, my tasks work as expected. (normal code tests).

Code being tested:

def handle(self, *args, **options):
    uid = options.get('user_id')

    # @@@ Need to exclude out users who have gotten an email within $window
    # days.
    if uid is None:
        uids = User.objects.filter(is_active=True, userprofile__waitlisted=False).values_list('id', flat=True)
    else:
        uids = [uid]

    q = rq.Queue(connection=redis.Redis())

    for user_id in uids:
        q.enqueue(mail_user, user_id)

My tests:

class DjangoMailUsersTest(DjangoTestCase):
    def setUp(self):
        self.cmd = MailUserCommand()

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_no_userid_queues_all_userids(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        u2 = UserF.create(userprofile__waitlisted=False)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls,
                              [call(ANY, u1.pk), call(ANY, u2.pk)])

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_waitlisted_people_excluded(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        UserF.create(userprofile__waitlisted=True)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls, [call(ANY, u1.pk)])
Loveless answered 26/1, 2013 at 19:7 Comment(2)
Upvote for the correct approach -- not sure about the code. Doing tests with the whole Redis stack in play is a bad idea.Republic
Patching rq.Queue didn't work for me. I couldn't see any of the calls. I had to patch myapp.my_module.django_rq. toptal.com/python/an-introduction-to-mocking-in-python has a mantra: "Mock an item where it is used, not where it came from."Contumacy
C
1

I commited a patch that lets you do:

from django.test impor TestCase
from django_rq import get_queue

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        queue = get_queue(async=False)
        queue.enqueue(func) # func will be executed right away
        # Test for job completion

This should make testing RQ jobs easier. Hope that helps!

Comeuppance answered 22/3, 2013 at 1:53 Comment(1)
Why not one global setting as in celery?Copier
M
1

Just in case this would be helpful to anyone. I used a patch with a custom mock object to do the enqueue that would run right away

#patch django_rq.get_queue
with patch('django_rq.get_queue', return_value=MockBulkJobGetQueue()) as mock_django_rq_get_queue:
    #Perform web operation that starts job. In my case a post to a url

Then the mock object just had one method:

class MockBulkJobGetQueue(object):

    def enqueue(self, f, *args, **kwargs):
        # Call the function
        f(
            **kwargs.pop('kwargs', None)
        )
Mahmoud answered 31/7, 2016 at 18:31 Comment(0)
D
1

what I've done for this case is to detect if I'm testing, and use fakeredis during tests. finally, in the test itself, I enqueue the redis worker task in synch mode:

first, define a function that detects if you're testing:

TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'

def am_testing():
    return TESTING

then in your file that uses redis to queue up tasks, manage the queue this way. you could extend get_queue to specify a queue name if needed:

if am_testing():
    from fakeredis import FakeStrictRedis 
    from rq import Queue
    def get_queue():
        return Queue(connection=FakeStrictRedis())

else:
    import django_rq
    def get_queue():
        return django_rq.get_queue()

then, enqueue your task like so:

queue = get_queue()
queue.enqueue(task_mytask, arg1, arg2)

finally, in your test program, run the task you are testing in synch mode, so that it runs in the same process as your test. As a matter of practice, I first clear the fakeredis queue, but I don't think its necessary since there are no workers:

from rq import Queue
from fakeredis import FakeStrictRedis

FakeStrictRedis().flushall()
queue = Queue(async=False, connection=FakeStrictRedis())
queue.enqueue(task_mytask, arg1, arg2)

my settings.py has the normal django_redis settings, so django_rq.getqueue() uses these when deployed:

RQ_QUEUES = {
    'default': {
        'HOST': env_var('REDIS_HOST'),
        'PORT': 6379,
        'DB': 0,
        # 'PASSWORD': 'some-password',
        'DEFAULT_TIMEOUT': 360,
    },
    'high': {
        'HOST': env_var('REDIS_HOST'),
        'PORT': 6379,
        'DB': 0,
        'DEFAULT_TIMEOUT': 500,
    },
    'low': {
        'HOST': env_var('REDIS_HOST'),
        'PORT': 6379,
        'DB': 0,
    }
}
Debug answered 22/11, 2017 at 22:37 Comment(0)
B
0

You'll need your tests to pause while there are still jobs in the queue. To do this, you can check Queue.is_empty(), and suspend execution if there are still jobs in the queue:

import time
from django.utils.unittest import TestCase
import django_rq

class TestQueue(TestCase):

def test_something(self):
    # simulate some User actions which will queue up some tasks

    # Wait for the queued tasks to run
    queue = django_rq.get_queue('default')
    while not queue.is_empty():
        time.sleep(5) # adjust this depending on how long your tasks take to execute

    # queued tasks are done, check state of the DB
    self.assert(.....)
Barkeeper answered 7/8, 2012 at 12:7 Comment(2)
Is there anyway to change the any queues to a separate testing queue? And is it possible to run redis and a worker from ./manage.py test?Kiushu
Actually another problem is getting the queues to use the test database. Look at naming the test database for a start. I wouldn't suggest running Redis from within manage.py test, it's just unneeded extra complexity. You might find this useful: bruno.im/2012/may/30/rq-tipsBarkeeper
P
0

I came across the same issue. In addition, I executed in my Jobs e.g. some mailing functionality and then wanted to check the Django test mailbox if there were any E-Mail. However, since the with Django RQ the jobs are not executed in the same context as the Django test, the emails that are sent do not end up in the test mailbox.

Therefore I need to execute the Jobs in the same context. This can be achieved by:

from django_rq import get_queue
queue = get_queue('default')
queue.enqueue(some_job_callable)

# execute input watcher
jobs = queue.get_jobs()

# execute in the same context as test
while jobs:
    for job in jobs:
        queue.remove(job)
        job.perform()
    jobs = queue.get_jobs()

# check no jobs left in queue
assert not jobs

Here you just get all the jobs from the queue and execute them directly in the test. One can nicely implement this in a TestCase Class and reuse this functionality.

Poltergeist answered 28/2, 2019 at 13:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.