what I've done for this case is to detect if I'm testing, and use fakeredis during tests. finally, in the test itself, I enqueue the redis worker task in synch mode:
first, define a function that detects if you're testing:
TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'
def am_testing():
return TESTING
then in your file that uses redis to queue up tasks, manage the queue this way.
you could extend get_queue to specify a queue name if needed:
if am_testing():
from fakeredis import FakeStrictRedis
from rq import Queue
def get_queue():
return Queue(connection=FakeStrictRedis())
else:
import django_rq
def get_queue():
return django_rq.get_queue()
then, enqueue your task like so:
queue = get_queue()
queue.enqueue(task_mytask, arg1, arg2)
finally, in your test program, run the task you are testing in synch mode, so that it runs in the same process as your test. As a matter of practice, I first clear the fakeredis queue, but I don't think its necessary since there are no workers:
from rq import Queue
from fakeredis import FakeStrictRedis
FakeStrictRedis().flushall()
queue = Queue(async=False, connection=FakeStrictRedis())
queue.enqueue(task_mytask, arg1, arg2)
my settings.py has the normal django_redis settings, so django_rq.getqueue() uses these when deployed:
RQ_QUEUES = {
'default': {
'HOST': env_var('REDIS_HOST'),
'PORT': 6379,
'DB': 0,
# 'PASSWORD': 'some-password',
'DEFAULT_TIMEOUT': 360,
},
'high': {
'HOST': env_var('REDIS_HOST'),
'PORT': 6379,
'DB': 0,
'DEFAULT_TIMEOUT': 500,
},
'low': {
'HOST': env_var('REDIS_HOST'),
'PORT': 6379,
'DB': 0,
}
}