Adding extra celery configs to Airflow
Asked Answered
S

2

5

Anyone know where I can add extra celery configs to airflow celery executor? For instance I want http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-pool-restarts this property but how do I allow extra celery properties..

Song answered 7/7, 2017 at 21:21 Comment(0)
C
9

Use the just-released Airflow 1.9.0 and this is now configurable.

In airflow.cfg there is this line:

# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

which points to a python file from the import path. The current default version can is https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/config_templates/default_celery.py

If you need a setting that isn't tweakable via that file then create a new module, say 'my_celery_config.py':

CELERY_CONFIG = {
    # ....
}

and put it in your AIRFLOW_HOME dir (i.e. along side the dags/ folder) and then set celery_config_options = my_celery_config.CELERY_CONFIG in the config.

Cathedral answered 3/1, 2018 at 10:5 Comment(2)
expanding on your answer: from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG CELERY_CONFIG = dict(DEFAULT_CELERY_CONFIG, **{key1: value1, ...}) will give you all the regular defaults, with your values overridingSkewbald
This isn't working for me in the newest version of airflowRickart
P
1

In case you're running Airflow in Docker and you want to change the Celery configuration, you need to do the following:

  1. Create an Airflow config folder (if you don't have one already) at the same level where your dags folder is and add a custom celery configuration file (e.g. custom_celery_config.py) there.

  2. Change the default Celery configuration in the custom_celery_config.py. The idea is that this python script should contain a variable, which contains the default Celery configuration plus your changes to it. E.g. if you like to change the task_queues configuration of Celery, your custom_celery_config.py should look like this:

    from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
    from kombu import Exchange, Queue
    
     CELERY_TASK_QUEUES = [
         Queue('task1', Exchange('task1', type='direct'), routing_key='task1', queue_arguments={'x-max-priority': 8}),
         Queue('task2', Exchange('task2', type='direct'), routing_key='task2', queue_arguments={'x-max-priority': 6}),
     ]
    
     CELERY_CONFIG = {
         **DEFAULT_CELERY_CONFIG,
         "task_queues": CELERY_TASK_QUEUES
     }
    
  3. Mount the config folder in the docker-compose.yml:

    volumes:
        - /data/airflow/config:/opt/airflow/config
    
  4. Set the Celery configuration in the docker-compose.yml like this (since Docker can see the config folder, it can access your custom_celery_config.py):

    AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS: 'custom_celery_config.CELERY_CONFIG'
    
  5. Restart the Airflow Webserver, Scheduler etc.

Reference: here.

For more info about the Celery configuration check this documentation.

Prate answered 27/7, 2022 at 13:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.