Airflow Logs BrokenPipeException
Asked Answered
P

1

10

I'm using a clustered Airflow environment where I have four AWS ec2-instances for the servers.

ec2-instances

  • Server 1: Webserver, Scheduler, Redis Queue, PostgreSQL Database
  • Server 2: Webserver
  • Server 3: Worker
  • Server 4: Worker

My setup has been working perfectly fine for three months now but sporadically about once a week I get a Broken Pipe Exception when Airflow is attempting to log something.

*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-13T00:00:00/1.log

[2018-07-16 00:00:15,521] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:00:15,698] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1407} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2018-07-16 00:00:15,719] {models.py:1428} INFO - Executing <Task(OmegaFileSensor): task_1> on 2018-07-13 00:00:00
[2018-07-16 00:00:15,720] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run foobar task_1 2018-07-13T00:00:00 --job_id 1320 --raw -sd DAGS_FOLDER/datalake_digitalplatform_arl_workflow_schedule_test_2.py']
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2018-07-16 00:00:16,534] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,533] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-07-16 00:00:16,597] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,597] {models.py:189} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/datalake_digitalplatform_arl_workflow_schedule_test_2.py
[2018-07-16 00:00:16,768] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - --- Logging error ---

[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - Traceback (most recent call last):

[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING -   File "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
    self.flush()

[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING -   File "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
    self.stream.flush()

[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - BrokenPipeError: [Errno 32] Broken pipe

[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - Call stack:

[2018-07-16 00:16:24,933] {logging_mixin.py:84} WARNING -   File "/usr/bin/airflow", line 27, in <module>
    args.func(args)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
    result = task_copy.execute(context=context)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
    while not self.poke(context):

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
    directory = os.listdir(full_path)

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING -   File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 36, in handle_timeout
    self.log.error("Process timed out")

[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - Message: 'Process timed out'
Arguments: ()

[2018-07-16 00:16:24,942] {models.py:1595} ERROR - Timeout
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
    while not self.poke(context):
  File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
    directory = os.listdir(full_path)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout
[2018-07-16 00:16:24,942] {models.py:1624} INFO - Marking task as FAILED.
[2018-07-16 00:16:24,956] {models.py:1644} ERROR - Timeout

Sometimes the error will also say

*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
*** Failed to fetch log file from worker. 404 Client Error: NOT FOUND for url: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log

I'm not sure why the logs are working ~95% of the time but are randomly failing at other times. Here are my log settings in my Airflow.cfg file,

# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /home/ec2-user/airflow/logs

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id =
encrypt_s3_logs = False

# Logging level
logging_level = INFO

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =

# Log format
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Name of handler to read task instance logs.
# Default to use file task handler.
task_log_reader = file.task

# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile = 

# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# How often should stats be printed to the logs
print_stats_interval = 30

child_process_log_directory = /home/ec2-user/airflow/logs/scheduler

I'm wondering if maybe I should try a different technique for my logging such as writing to an S3 Bucket or if there is something else I can do to fix this issue.

Update:

Writing the logs to S3 did not resolve this issue. Also, the error is more consistent now (still sporadic). It's happening more like 50% of the time now. One thing I noticed is that the task it's happening on is my AWS EMR creation task. Starting an AWS EMR cluster takes about 20 minutes and then the task has to wait for the Spark commands to run on the EMR cluster. So the single task is running for about 30 minutes. I'm wondering if this is too long for an Airflow task to be running and if that's why it starts to fail writing to the logs. If this is the case then I could breakup the EMR task so that there is one task for the EMR creation, then another task for the Spark commands on the EMR cluster.

Note:

I've also created a new bug ticket on Airflow's Jira here https://issues.apache.org/jira/browse/AIRFLOW-2844

Phenolic answered 16/7, 2018 at 16:1 Comment(2)
I've also created a new bug ticket on Airflow's Jira here issues.apache.org/jira/browse/AIRFLOW-2844Phenolic
FWIW I've had Airflow tasks running for 20hours without problem, so your task "shouldn't" be too taking to long. I don't have any insight in to what is causing this though, sorryOtorhinolaryngology
P
2

This issue is a symptom of another issue I just resolved here AirflowException: Celery command failed - The recorded hostname does not match this instance's hostname.

I didn't see the AirflowException: Celery command failed for a while because it showed up on the airflow worker logs. It wasn't until I watched the airflow worker logs in real time that I saw when the error is thrown I also got the BrokenPipeException in my task.

It gets somewhat weirder though. I would only see the BrokenPipeException thrown if I did print("something to log") and the AirflowException: Celery command failed... error happened on the Worker node. When I changed all of my print statements to use import logging ... logging.info("something to log") then I would not see the BrokenPipeException but the task would still fail because of the AirflowException: Celery command failed... error. But had I not seen the BrokenPipeException being thrown in my Airflow task logs I wouldn't have known why the task was failing because once I eliminated the print statements I never saw any error in the Airflow task logs (only on the $airflow worker logs)

So long story short there are a few take aways.

  1. Don't do print("something to log") use Airflow's built in logging by importing logging and then using the logging class like import logging then logging.info("something to log")

  2. If you're using an AWS EC2-Instance as your server for Airflow then you may be experiencing this issue: https://github.com/apache/incubator-airflow/pull/2484 a fix to this issue has already been integrated into Airflow Version 1.10 (I'm currently using Airflow Version 1.9). So upgrade your Airflow version to 1.10. You can also use the command here pip install git+git://github.com/apache/incubator-airflow.git@v1-10-stable. Also, if you don't want to upgrade your Airflow version then you could follow the steps on the github issue to either manually update the file with the fix or fork Airflow and cherry pick the commit that fixes it.

Phenolic answered 10/8, 2018 at 16:14 Comment(5)
The pip install command in #2 will give you the master branch which is what will become 2.0 once the 1.10 release candidate is finalized. Master is somewhat unstable at times but rarely totally broken. I'd recommend going with the 1.10 stable trunk for now which is up-to-date / slightly ahead of the latest 1.10 RC pip install git+git://github.com/apache/incubator-airflow.git@v1-10-stable.Nauru
@TaylorEdmiston we're having trouble installing Airflow version 1.10 using that command. It successfully installs Airflow and I can see it's version 1.10 by running airflow version but when I try to start up the scheduler we get an exception saying it's missing the MySqlDb module. I've created a new post for this here #51861953Phenolic
The fix they put is wrong... github.com/apache/incubator-airflow/pull/2484/commits/… check out this post https://mcmap.net/q/1168715/-python-socket-gethostname the fix they put is giving us the hostname not the IP address. We have to use socket.gethostbyname(socket.gethostname())Phenolic
>>> socket.gethostbyname(socket.gethostname()) '10.185.143.196' >>> socket.gethostname() 'ip-10-185-143-196' >>> socket.getfqdn() 'ip-10-185-143-196'Phenolic
Kyle Bridenstine With all due respect I am having hard time accepting the solution of using the Airflow logging api instead of print statements to resolve this broken pipe exception. We use airflow 1.10 extensively and print statements works all good and great until 6 months now. However we did get the Broken-Pipe exception recently. Our Airflow instance is heavily used as a part of our data pipeline. When all of it worked fine so far so till date with the print statements so it still seems unclear how come it cracks in between.Cinder

© 2022 - 2024 — McMap. All rights reserved.