How to create multiple workers in Python-RQ?
Asked Answered
S

3

15

We have recently forced to replace celery with RQ as it is simpler and celery was giving us too many problems. Now, we are not able to find a way to create multiple queues dynamically because we need to get multiple jobs done concurrently. So basically every request to one of our routes should start a job and it doesn't make sense to have multiple users wait for one user's job to be done before we can proceed with next jobs. We periodically send a request to the server in order to get the status of the job and some meta data. This way we can update the user with a progress bar (It could be a lengthy process so this has to be done for the sake of UX)

We are using Django and Python's rq library. We are not using django-rq (Please let me know if there are advantages in using this)

So far we start a task in one of our controllers like:

redis_conn = Redis()
q = Queue(connection=redis_conn)  
job = django_rq.enqueue(render_task, new_render.pk, domain=domain, data=csv_data, timeout=1200)

Then in our render_task method we add meta data to the job based on the state of the long task:

current_job = get_current_job()
current_job.meta['state'] = 'PROGRESS'
current_job.meta['process_percent'] = process_percent
current_job.meta['message'] = 'YOUTUBE'
current_job.save()

Now we have another endpoint that gets the current task and its meta data and passes it back to client (This happens through oeriodic AJAX request)

How do we go about running jobs concurrently without blocking other jobs? Should we make queues dynamically? Is there a way to make use of Workers in order to achieve this?

Surcharge answered 16/9, 2015 at 2:49 Comment(1)
@MostafaHussein We gave up on that project. It was so long a go that I don't remember. Please if you do find a solution make sure to post it here as an answer.Surcharge
F
9

As far as I know RQ does not have any facility to manage multiple workers. You have to start a new worker process defining which queue it will consume. One way of doing this which works pretty well for me is using Supervisor. In supervisor you configure your worker for a given queue and number of processes to have concurrency. For example you can have queue "high-priority" with 5 workers and queue "low-priority" with 1 worker.

Fernandafernande answered 11/5, 2016 at 18:5 Comment(6)
May I ask if you need to add multiple similar sections in supervisor configuration file (i.e. one section for each desired worker), or whether there is a specific attribute to declare in a single section that multiple workers should run concurrently ?Haag
Use numproc=N to instruct supervisor to spawn N processes.Fernandafernande
Thanks ! actually, not "numproc" but "numprocs"Haag
Grrr "numproc = 2" has no effects, while with "numprocs = 2" supervisor doesn't event start (with no reason showing up in the log). "supervisord --version" gives 3.2.0, so according to the doc it should be supported. By the way, adding multiple sections to spawn the same process works, what the hell is happening here ?Haag
@MarioOrlandi yes, numprocs=N is correct directive. The reason supervisor may fail to start is if you do not include %(process_num) in the name of the process. e.g. process_name=foo_%(process_num)02d. This is only required for numprocs > 1Fernandafernande
The selected answer is way too old, right now Python-RQ does support multiple intances of workers, please check this documentation here using systemd python-rq.org/patterns/systemdRhyme
M
8

It is not only possible but ideal to run multiple workers. I use a bash file for the start command to enter the virtual env, and launch with a custom Worker class.

Here's a supervisor config that has worked very well for me for RQ workers, under a production workload as well. Note that startretries is high since this runs on AWS and needs retries during deployments.

[program:rq-workers]
process_name=%(program_name)s_%(process_num)02d
command=/usr/local/bin/start_rq_worker.sh
autostart=true
autorestart=true
user=root
numprocs=5
startretries=50
stopsignal=INT
killasgroup=true
stopasgroup=true
stdout_logfile=/opt/elasticbeanstalk/tasks/taillogs.d/super_logs.conf
redirect_stderr=true

Contents of start_rq_worker.sh

#!/bin/bash
date > /tmp/date
source /opt/python/run/venv/bin/activate
source /opt/python/current/env
/opt/python/run/venv/bin/python /opt/python/current/app/manage.py
rqworker --worker-class rq.SimpleWorker default
Minneapolis answered 9/5, 2019 at 15:29 Comment(2)
according to documentation ; process_num is required if you specify >1 numprocs. Why you do not specify process_num?Perforce
@Perforce he did. In the first line: process_name=%(program_name)s_%(process_num)02d. process_num is not an argument but a parameter passed to process_nameKerman
C
3

I would like to suggest a very simple solution using django-rq:

Sample settings.py

...

RQ_QUEUES = {
    'default': {
        'HOST': os.getenv('REDIS_HOST', 'localhost'),
        'PORT': 6379,
        'DB': 0,
        'DEFAULT_TIMEOUT': 360,
    },
    'low': {
        'HOST': os.getenv('REDIS_HOST', 'localhost'),
        'PORT': 6379,
        'DB': 0,
        'DEFAULT_TIMEOUT': 360,
    }
}

...

Run Configuration

Run python manage.py rqworker default low as many times (each time in its own shell, or as its own Docker container, for instance) as the number of desired workers. The order of queues in the command determines their priority. At this point, all workers are listening to both queues.

In the Code

When calling a job to run, pass in the desired queue:

For high/normal priority jobs, you can make the call without any parameters, and the job will enter the default queue. For low priority, you must specify, either at the job level:

@job('low')
def my_low_priority_job():
  # some code

And then call my_low_priority_job.delay().

Alternatively, determine priority when calling:

queue = django_rq.get_queue('low')
queue.enqueue(my_variable_priority_job)
Campania answered 30/11, 2016 at 8:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.