Airflow DAG fails when PythonOperator with error "Negsignal.SIGKILL"
Asked Answered
B

6

16

I am running Airflowv1.10.15 on Cloud Composer v1.16.16.

My DAG looks like this :

from datetime import datetime, timedelta

# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large

default_args = {
    'owner': 'xxxx',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 14),
    'email_on_failure': True,
    'email': ['xxxx'],
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'catchup': False
}


# Define the DAG with parameters
dag = DAG(
    dag_id='xxxx_v1',
    schedule_interval='0 20 * * *',
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    concurrency=1
)

def wd_to_bq(key, val, **kwargs):
    logger.info("workday to BQ ingestion")
    workday_extract.fetch_wd_load_bq(key, val)


start_load = DummyOperator(task_id='start', dag=dag)

end_load = DummyOperator(task_id='end', dag=dag)

for key, val in workday_config_large.endpoint_tbl_mapping.items():
    # Task 1: Process the unmatched records from the view
    workday_to_bq = PythonOperator(
        dag=dag,
        task_id=f'{key}',
        execution_timeout=timedelta(minutes=60),
        provide_context=True,
        python_callable=wd_to_bq,
        op_kwargs={'key': key, 'val': val}
    )
    start_load >> workday_to_bq >> end_load

The task fails with error - Task exited with return code Negsignal.SIGKILL . The python script runs fine on my local machine and completes in 15 minutes. There are multiple endpoints from which the reports are extracted. However, the one that takes longest ( ~15 minutes) fails with this error and others succeed.

I have tried a lot of options but none seem to work. Can someone help on this ?

Barrault answered 18/9, 2021 at 5:15 Comment(2)
Cloud Composer gives you a monitoring dashboard. I'd suggest running only the task that fails and checking the memory and CPU pressure on the Airflow worker during that time. That'll tell you which resources you need to increase.Nifty
If my answer addressed your question, please consider accepting and upvoting it. If not, let me know so that I can improve my answer.Vookles
O
11

I resolved the issue by increasing memory size

https://github.com/apache/airflow/issues/10435

Should check the memory size of the pod that roles as worker while running the task

Omnirange answered 18/9, 2021 at 7:42 Comment(3)
Link only answer is useless. Can you elaborate on this a little more?Fact
The data file to be downloaded is around 100 MB. I am using a cluster of 3 nodes each with n1-standard-1, i.e. 3.75 G RAM . Is is not enough ? Should i increase it to 7.5 G ?Barrault
Did you do what Greg@ suggested? This error is related to resources, so you just need to increase the resources. However, this is a know issue in Airflow and they will post an official solution when they have one.Neelyneeoma
S
4

A message like below in your airflow task logs suggests that the kernel/OS killed your process. SIGKILL(signal 9) is a directive to kill the process immediately.

{{local_task_job.py:102}} INFO - Task exited with return code Negsignal.SIGKILL

It is very likely that the task you are performing(in this case the function - workday_to_bq) was utilizing a lot of resources on the worker container. I'm assuming that you are ingesting and processing some data which can be memory intensive.

You've mentioned that its working locally but failing in airflow cloud. This could be because either you have a lot of RAM on local system OR your cloud composer airflow workers are processing other DAG's that are hogging the worker memory. To confirm that this is a memory issue you can check the dashboard provided by the cloud service.

Airflow runs its tasks on workers, hence you will have to upgrade the workers with better hardware. Try increasing the RAM.

  • In case of fully managed services like cloud composer, MWAA the cloud provider should allow you to increase the underlying hardware.
  • If you are using docker, docker-desktop(link to increase overall memory of docker desktop), swarm or kubernetes then check what is the container/pod memory limit for worker is set to. This can be then increased accordingly in the manifest files.

Note that the purpose of airflow is to schedule ETL tasks and orchestrate the pipeline. You shouldn't be loading high volumes of data into the airflow workers and utilize all of its cpu/memory. This will slow down your entire airflow environment or SIGKILL your DAGS randomly. In most cases only the DAG/process that is using too much memory will be killed by the OOM killer, but sometimes it can kill other DAGS/process's on the same worker simultaneously.

For loading/processing/writing large amounts of data use ETL tools like fivetran, airbyte, databricks, nifi, azure data factory etc.. and use airflow for scheduling and orchestration.

Snakebite answered 6/8, 2022 at 12:14 Comment(1)
This is a far better answer than the accepted answer!! My problem was not due to any particular memory-intensive process, but rather that I am launching a massive number of parallel jobs: 50000 small calculations + pub/sub messages in an async fashion. Sometimes the job would fail and need to restart.Pasticcio
V
2

This error occurs when the allocated resources are less than what is required. DAG execution is RAM limited. More memory can be consumed depending on the DAG’s nature. So it is always preferable to use machine types with higher memory. Since you are using Cloud Composer 1, autoscaling of the resources is not possible. It would be preferable to increase your resources.

Vookles answered 18/9, 2021 at 5:16 Comment(0)
R
1

I had this issue too, but took a different approach.

Have you considered how your script may use less memory / use memory better, instead of simply increasing the available memory ?

    with db_connector_warehouse.create_session() as session:
        query = session.query(offers_table)\
            .yield_per(chunk_size).enable_eagerloads(False)
        
        for df in pd.read_sql(query.statement, session.bind, chunksize=chunk_size):
            yield df

in the above example - bottom part passing chunksize to pandas will have it pull the dataframe in smaller chunks, however pandas still loads everything into memory first, and then gives you the part you requested (for read_sql, and likely other loading functions such as csv / xlsx but haven't looked into this).

So you must ensure that you don't load the entire dataset - if using SQL Alchemy's ORM you need to use the yield_per param. For normal connections, you can set the connection to stream the results

A couple useful resources if you'd rather go down the route of using less memory:

How to create a large pandas dataframe from an sql query without running out of memory?

https://pythonspeed.com/articles/pandas-sql-chunking/

and if you're not familiar with the yield flow control What does the "yield" keyword do?

Reptilian answered 4/4, 2022 at 8:19 Comment(0)
M
0

I had this happen when I was using a ThreadPoolExecutor, which doesn't release any resources until all the futures are done. To prevent the errors, I switched to processing four elements at a time:

while True:
    chunk = itertools.islice(documents, 0, 4)
    if not chunk:
        break
    with ThreadPoolExecutor(max_workers=4) as executor:
        for each in executor.map(TextScraper(), chunk):
            pass
Mojave answered 17/4, 2022 at 4:57 Comment(0)
B
0

I ran airflow in kubernetes, allocated a separate server with 25GB of RAM for the worker and there were no resource restrictions After launching, DAG crashed after a few minutes, at which time the airflow worker took all available memory The problem was in the large amount of data (database table, 4 GB, 17 million rows) that he was trying to work with. After deleting half of the data manually from the database, DAG worked successfully

Brodeur answered 14/3 at 19:27 Comment(1)
Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.Clamatorial

© 2022 - 2024 — McMap. All rights reserved.