Anyone know where I can add extra celery configs to airflow celery executor? For instance I want http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-pool-restarts this property but how do I allow extra celery properties..
Use the just-released Airflow 1.9.0 and this is now configurable.
In airflow.cfg there is this line:
# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
which points to a python file from the import path. The current default version can is https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/config_templates/default_celery.py
If you need a setting that isn't tweakable via that file then create a new module, say 'my_celery_config.py':
CELERY_CONFIG = {
# ....
}
and put it in your AIRFLOW_HOME dir (i.e. along side the dags/ folder) and then set celery_config_options = my_celery_config.CELERY_CONFIG
in the config.
In case you're running Airflow in Docker and you want to change the Celery configuration, you need to do the following:
Create an Airflow
config
folder (if you don't have one already) at the same level where yourdags
folder is and add a custom celery configuration file (e.g.custom_celery_config.py
) there.Change the default Celery configuration in the
custom_celery_config.py
. The idea is that this python script should contain a variable, which contains the default Celery configuration plus your changes to it. E.g. if you like to change thetask_queues
configuration of Celery, yourcustom_celery_config.py
should look like this:from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG from kombu import Exchange, Queue CELERY_TASK_QUEUES = [ Queue('task1', Exchange('task1', type='direct'), routing_key='task1', queue_arguments={'x-max-priority': 8}), Queue('task2', Exchange('task2', type='direct'), routing_key='task2', queue_arguments={'x-max-priority': 6}), ] CELERY_CONFIG = { **DEFAULT_CELERY_CONFIG, "task_queues": CELERY_TASK_QUEUES }
Mount the
config
folder in thedocker-compose.yml
:volumes: - /data/airflow/config:/opt/airflow/config
Set the Celery configuration in the
docker-compose.yml
like this (since Docker can see theconfig
folder, it can access yourcustom_celery_config.py
):AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS: 'custom_celery_config.CELERY_CONFIG'
Restart the Airflow Webserver, Scheduler etc.
Reference: here.
For more info about the Celery configuration check this documentation.
© 2022 - 2024 — McMap. All rights reserved.
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
CELERY_CONFIG = dict(DEFAULT_CELERY_CONFIG, **{key1: value1, ...})
will give you all the regular defaults, with your values overriding – Skewbald