I am running Airflowv1.10.15 on Cloud Composer v1.16.16.
My DAG looks like this :
from datetime import datetime, timedelta
# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large
default_args = {
'owner': 'xxxx',
'depends_on_past': False,
'start_date': datetime(2021, 9, 14),
'email_on_failure': True,
'email': ['xxxx'],
'retries': 1,
'retry_delay': timedelta(minutes=2),
'catchup': False
}
# Define the DAG with parameters
dag = DAG(
dag_id='xxxx_v1',
schedule_interval='0 20 * * *',
default_args=default_args,
catchup=False,
max_active_runs=1,
concurrency=1
)
def wd_to_bq(key, val, **kwargs):
logger.info("workday to BQ ingestion")
workday_extract.fetch_wd_load_bq(key, val)
start_load = DummyOperator(task_id='start', dag=dag)
end_load = DummyOperator(task_id='end', dag=dag)
for key, val in workday_config_large.endpoint_tbl_mapping.items():
# Task 1: Process the unmatched records from the view
workday_to_bq = PythonOperator(
dag=dag,
task_id=f'{key}',
execution_timeout=timedelta(minutes=60),
provide_context=True,
python_callable=wd_to_bq,
op_kwargs={'key': key, 'val': val}
)
start_load >> workday_to_bq >> end_load
The task fails with error - Task exited with return code Negsignal.SIGKILL . The python script runs fine on my local machine and completes in 15 minutes. There are multiple endpoints from which the reports are extracted. However, the one that takes longest ( ~15 minutes) fails with this error and others succeed.
I have tried a lot of options but none seem to work. Can someone help on this ?