I need to replace my redis
broker with SQS
broker, while googleing it I came across many pages which tell how to use SQS
with celery
. As per my understanding, it creates own SQS queue, I have only one task and want to use already created SQS queue.
By default celery will create a new queue for you using the a Queue Prefix settings if defined.
However, if you want to use an existing queue, you can provide the name with the task-default-queue
settings. Do make sure you don't define the queue-prefix mentioned above in this case.
The commit on 26th Feb, 2020 adds the ability to use predefined queues.
You should be able to use predefined Queues by adding predefined queues options to CELERY_BROKER_TRANSPORT_OPTIONS
CELERY_BROKER_TRANSPORT_OPTIONS={
'predefined_queues':{
'HIGH_PRIORITY': {
'url': 'https://sqs.ap-south-1.amazonaws.com/030221/HGH_PRIORITY',
'access_key_id': config('AWS_ACCESS_KEY'),
'secret_access_key': config('AWS_SECRET_KEY'),
},
}
}
Following is the documentation update from the commit -
Other Features supported by this transport:
Predefined Queues:
The default behavior of this transport is to use a single AWS credential
pair in order to manage all SQS queues (e.g. listing queues, creating
queues, polling queues, deleting messages).
If it is preferable for your environment to use a single AWS credential, you
can use the 'predefined_queues' setting inside the 'transport_options' map.
This setting allows you to specify the SQS queue URL and AWS credentials for
each of your queues. For example, if you have two queues which both already
exist in AWS) you can tell this transport about them as follows:
transport_options = {
'predefined_queues': {
'queue-1': {
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa',
'access_key_id': 'a',
'secret_access_key': 'b',
},
'queue-2': {
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb',
'access_key_id': 'c',
'secret_access_key': 'd',
},
}
}
You could set the queue name via broker_transport_options
(in celery 4.0) like:
broker_transport_options = {"queue_name_prefix": "my-queue-"}
Documentation is here
If you have already created a queue in SQS and its name is "my_super_queue", then to use it in celery you should define the configuration as follows:
broker_url = f"sqs://{aws_access_key}:{aws_secret_key}@"
result_backend = 'file://results' # Or whatever option but 'rpc'
task_default_queue = "my_super_queue"
broker_transport_options = {
'visibility_timeout': 100, # YOU DECIDE THIS NUMBER
'region': 'us-west-2', # DON'T FORGET THIS
}
Remember to put the credentials of the user you are login in with. And also remember to give the correct permission to that user (in my case I gave it AmazonSQSFullAccess)
By giving the credentials (access and secret key) you don't need to specify any url in the broker_url. This is because when connecting with the given credentials you have access to you SQS queue's list. It will try to use the existing queue specified in task_default_queue and if it does not find it, it will create it.
I didn't specify here a value for queue_name_prefix (inside broker_transport_options) but if you do it, the final name for the queue to use (or to create) will be the concatenation of both queue_name_prefix followed by task_default_queue.
Consider that if the created queue is a SQS FIFO queue, it must end with ".fifo" so in this case it would be my_super_queue.fifo
I have used the below code for queue_name_prefix... It is in Flask and Django App running on production..
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker="sqs://",
broker_transport_options={
"queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
},
)
task_base = celery.Task
class ContextTask(task_base):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return task_base.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
© 2022 - 2024 — McMap. All rights reserved.