How I can specify SQS queue name in celery
Asked Answered
C

5

9

I need to replace my redis broker with SQS broker, while googleing it I came across many pages which tell how to use SQS with celery. As per my understanding, it creates own SQS queue, I have only one task and want to use already created SQS queue.

Cuttlefish answered 28/3, 2017 at 6:29 Comment(1)
Hi Pramod, did you find a solution for this?Notary
S
8

By default celery will create a new queue for you using the a Queue Prefix settings if defined.

However, if you want to use an existing queue, you can provide the name with the task-default-queue settings. Do make sure you don't define the queue-prefix mentioned above in this case.

Sepia answered 20/6, 2017 at 6:35 Comment(1)
didn't work for meLabarbera
C
4

The commit on 26th Feb, 2020 adds the ability to use predefined queues.

You should be able to use predefined Queues by adding predefined queues options to CELERY_BROKER_TRANSPORT_OPTIONS

CELERY_BROKER_TRANSPORT_OPTIONS={
    'predefined_queues':{
            'HIGH_PRIORITY': {
              'url': 'https://sqs.ap-south-1.amazonaws.com/030221/HGH_PRIORITY',
              'access_key_id': config('AWS_ACCESS_KEY'),
              'secret_access_key': config('AWS_SECRET_KEY'),
            },
        }
}

Following is the documentation update from the commit -

Other Features supported by this transport:
  Predefined Queues:
    The default behavior of this transport is to use a single AWS credential
    pair in order to manage all SQS queues (e.g. listing queues, creating
    queues, polling queues, deleting messages).
    If it is preferable for your environment to use a single AWS credential, you
    can use the 'predefined_queues' setting inside the  'transport_options' map.
    This setting allows you to specify the SQS queue URL and AWS credentials for
    each of your queues. For example, if you have two queues which both already
    exist in AWS) you can tell this transport about them as follows:
    transport_options = {
      'predefined_queues': {
        'queue-1': {
          'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa',
          'access_key_id': 'a',
          'secret_access_key': 'b',
        },
        'queue-2': {
          'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb',
          'access_key_id': 'c',
          'secret_access_key': 'd',
        },
      }
    }
Cymbal answered 2/4, 2020 at 12:12 Comment(0)
N
2

You could set the queue name via broker_transport_options (in celery 4.0) like:

broker_transport_options = {"queue_name_prefix": "my-queue-"}

Documentation is here

Nerveless answered 28/3, 2017 at 7:7 Comment(1)
This is the prefix of the queue name (that will be used to create a SQS queue). Is there a way to use existing queue ?Sepia
T
1

If you have already created a queue in SQS and its name is "my_super_queue", then to use it in celery you should define the configuration as follows:

broker_url = f"sqs://{aws_access_key}:{aws_secret_key}@"
result_backend = 'file://results' # Or whatever option but 'rpc'
task_default_queue = "my_super_queue"
  
broker_transport_options = {
    'visibility_timeout': 100, # YOU DECIDE THIS NUMBER
    'region': 'us-west-2', # DON'T FORGET THIS
}

Remember to put the credentials of the user you are login in with. And also remember to give the correct permission to that user (in my case I gave it AmazonSQSFullAccess)

By giving the credentials (access and secret key) you don't need to specify any url in the broker_url. This is because when connecting with the given credentials you have access to you SQS queue's list. It will try to use the existing queue specified in task_default_queue and if it does not find it, it will create it.

I didn't specify here a value for queue_name_prefix (inside broker_transport_options) but if you do it, the final name for the queue to use (or to create) will be the concatenation of both queue_name_prefix followed by task_default_queue.

Consider that if the created queue is a SQS FIFO queue, it must end with ".fifo" so in this case it would be my_super_queue.fifo

Tenotomy answered 16/2, 2022 at 12:45 Comment(0)
S
1

I have used the below code for queue_name_prefix... It is in Flask and Django App running on production..

from celery import Celery
def make_celery(app):
    celery = Celery(
        app.import_name,
        broker="sqs://",
        broker_transport_options={
            "queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
        },
    )
    task_base = celery.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery.Task = ContextTask

    return celery
Schist answered 1/9, 2023 at 5:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.