Airflow task running tweepy exits with return code -6
Asked Answered
G

2

6

I have a simple Airflow DAG which has only one task - stream_from_twitter_to_kafka

Here is the code for the DAG:

default_args = {
    "owner": "me",
    "depends_on_past": False,
    "start_date": datetime(2020, 1, 20),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=1),
}

NO_OF_TWEETS_TO_STREAM = 100

with DAG("stream_from_twitter",
         catchup=False,
         default_args=default_args,
         schedule_interval="@hourly") as dag:
    task1 = PythonOperator(task_id="stream_from_twitter_to_kafka",
                           python_callable=read_stream_of_tweets,
                           op_args=(NO_OF_TWEETS_TO_STREAM,))




task1

The code for read_stream_of_tweets uses tweepy to read incoming stream of tweets and publishes to a kafka topic:

# override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
    def __init__(self, *args, **kwargs):
        self.num_tweets = kwargs.pop('num_tweets')
        self.current_num_tweets = 0
        super(MyStreamListener, self).__init__(*args, **kwargs)
        self.kafka_producer = MyKafkaProducer()

    def on_status(self, status):
        if self.current_num_tweets >= self.num_tweets:
            # Limiting to a number.
            return False

        if not status.text.startswith("RT"):
            print(status.text)
            status_info = {
                'id': status.id_str,
                'text': status.text
            }
            self.kafka_producer.publish_message(TOPIC_PUB, value=status_info)
            self.current_num_tweets = self.current_num_tweets + 1

    def on_error(self, status_code):
        if status_code == 420:
            # returning False in on_data disconnects the stream
            return False


def read_stream_of_tweets(n):
    auth = tweepy.OAuthHandler(consumer_token,
                               consumer_secret)

    auth.set_access_token(access_token,
                          access_secret)

    myStreamListener = MyStreamListener(num_tweets=n)
    myStream = tweepy.Stream(auth=auth,
                             listener=myStreamListener)

    myStream.filter(track=['life'], languages=['en'])

Here is the log of the task:

*** Reading local file: /Users/aneeshmakala/Documents/ComputerScience/datascience/hapPy/airflow/logs/stream_from_twitter/stream_from_twitter_to_kafka/2020-01-20T12:27:48.408593+00:00/1.log
[2020-01-20 17:58:27,264] {base_task_runner.py:61} DEBUG - Planning to run as the  user
[2020-01-20 17:58:27,272] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2020-01-20 17:58:27,272] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2020-01-20 17:58:27,273] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2020-01-20 17:58:27,273] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
[2020-01-20 17:58:27,273] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]>
[2020-01-20 17:58:27,277] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to execute the task', 'default_pool')
[2020-01-20 17:58:27,280] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2020-01-20 17:58:27,280] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2020-01-20 17:58:27,280] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
[2020-01-20 17:58:27,280] {taskinstance.py:672} DEBUG - <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2020-01-20 17:58:27,280] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [queued]>
[2020-01-20 17:58:27,280] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2020-01-20 17:58:27,280] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-01-20 17:58:27,280] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-01-20 17:58:27,286] {taskinstance.py:887} INFO - Executing <Task(PythonOperator): stream_from_twitter_to_kafka> on 2020-01-20T12:27:48.408593+00:00
[2020-01-20 17:58:27,288] {standard_task_runner.py:52} INFO - Started process 11912 to run task
[2020-01-20 17:58:27,315] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,314] {cli_action_loggers.py:68} DEBUG - Calling callbacks: [<function default_action_log at 0x10da70830>]
[2020-01-20 17:58:27,326] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,326] {settings.py:213} DEBUG - Setting up DB connection pool (PID 11912)
[2020-01-20 17:58:27,327] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,326] {settings.py:221} DEBUG - settings.configure_orm(): Using NullPool
[2020-01-20 17:58:27,329] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,329] {dagbag.py:403} INFO - Filling up the DagBag from /Users/aneeshmakala/Documents/ComputerScience/datascience/hapPy/airflow/dags/stream_from_twitter.py
[2020-01-20 17:58:27,330] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,330] {dagbag.py:232} DEBUG - Importing /Users/aneeshmakala/Documents/ComputerScience/datascience/hapPy/airflow/dags/stream_from_twitter.py
[2020-01-20 17:58:27,332] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,332] {dagbag.py:370} DEBUG - Loaded DAG <DAG: stream_from_twitter>
[2020-01-20 17:58:27,351] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: stream_from_twitter.stream_from_twitter_to_kafka 2020-01-20T12:27:48.408593+00:00 [running]> 1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa
[2020-01-20 17:58:27,364] {__init__.py:97} DEBUG - Preparing lineage inlets and outlets
[2020-01-20 17:58:27,364] {__init__.py:133} DEBUG - inlets: [], outlets: []
[2020-01-20 17:58:27,364] {python_operator.py:105} INFO - Exporting the following env vars:
[email protected]
AIRFLOW_CTX_DAG_OWNER=me
AIRFLOW_CTX_DAG_ID=stream_from_twitter
AIRFLOW_CTX_TASK_ID=stream_from_twitter_to_kafka
AIRFLOW_CTX_EXECUTION_DATE=2020-01-20T12:27:48.408593+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2020-01-20T12:27:48.408593+00:00
[2020-01-20 17:58:27,367] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,366] {oauth1_auth.py:77} DEBUG - Signing request <PreparedRequest [POST]> using client <Client client_key=XXXXXX, client_secret=****, resource_owner_key=XXXXXX, resource_owner_secret=****, signature_method=HMAC-SHA1, signature_type=AUTH_HEADER, callback_uri=None, rsa_key=None, verifier=None, realm=None, encoding=utf-8, decoding=None, nonce=None, timestamp=None>
[2020-01-20 17:58:27,368] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,367] {oauth1_auth.py:93} DEBUG - Including body in call to sign: True
[2020-01-20 17:58:27,369] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,369] {__init__.py:133} DEBUG - Collected params: [('delimited', 'length'), ('oauth_nonce', 'XXXXXX'), ('oauth_timestamp', '1579523307'), ('oauth_version', '1.0'), ('oauth_signature_method', 'HMAC-SHA1'), ('oauth_consumer_key', 'XXXXXX'), ('oauth_token', 'XXXXXX'), ('track', 'life'), ('language', 'en')]
[2020-01-20 17:58:27,370] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,370] {__init__.py:137} DEBUG - Normalized params: delimited=length&language=en&oauth_consumer_key=XXXXXX&oauth_nonce=XXXXXX&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1579523307&oauth_token=XXXXXX&oauth_version=1.0&track=life
[2020-01-20 17:58:27,370] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,370] {__init__.py:138} DEBUG - Normalized URI: https://stream.twitter.com/1.1/statuses/filter.json
[2020-01-20 17:58:27,371] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,371] {__init__.py:143} DEBUG - Signing: signature base string: POST&https%3A%2F%2Fstream.twitter.com%2F1.1%2Fstatuses%2Ffilter.json&delimited%3Dlength%26language%3Den%26oauth_consumer_key%3DXXXXXX%26oauth_nonce%3DXXXXXX%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1579523307%26oauth_token%3DXXXXXX%26oauth_version%3D1.0%26track%3Dlife
[2020-01-20 17:58:27,371] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,371] {__init__.py:150} DEBUG - Signature: JEwre9zNc+Ge6ezoGop6oXpp5Js=
[2020-01-20 17:58:27,372] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,372] {oauth1_auth.py:114} DEBUG - Updated url: https://stream.twitter.com/1.1/statuses/filter.json?delimited=length
[2020-01-20 17:58:27,372] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,372] {oauth1_auth.py:115} DEBUG - Updated headers: {'Content-Type': 'application/x-www-form-urlencoded', 'Content-Length': '22', 'Authorization': 'OAuth oauth_nonce="XXXXXX", oauth_timestamp="1579523307", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="XXXXXX", oauth_token="XXXXXX", oauth_signature="JEwre9zNc%2BGe6ezoGop6oXpp5Js%3D"'}
[2020-01-20 17:58:27,373] {logging_mixin.py:112} INFO - [2020-01-20 17:58:27,373] {oauth1_auth.py:116} DEBUG - Updated body: '\x1b[1mtrack=life&language=en\x1b[0m'
[2020-01-20 17:58:32,274] {logging_mixin.py:112} INFO - [2020-01-20 17:58:32,273] {base_job.py:200} DEBUG - [heartbeat]
[2020-01-20 17:58:32,274] {logging_mixin.py:112} INFO - [2020-01-20 17:58:32,274] {local_task_job.py:124} DEBUG - Time since last heartbeat(0.01 s) < heartrate(5.0 s), sleeping for 4.990854 s
[2020-01-20 17:58:37,265] {logging_mixin.py:112} INFO - [2020-01-20 17:58:37,265] {local_task_job.py:103} INFO - Task exited with return code -6

After some research, apparently, return code -6 is associated with SIGABRT. I have no idea why the task process is aborting.

Manually testing the task offline using airflow test stream_from_twitter stream_from_twitter_to_kafka 20200120 works like a charm. But it fails with above log when run by the scheduler.

No idea what's going on here. I saw something similar at GoogleCloudStorageDownloadOperator "Task exited with return code -6", but there's no solution there either.

Edit:

  1. Pasted DEBUG logs

  2. airflow version: 1.10.7

  3. Executor: SequentialExecutor (default)

  4. DB backend: I'm using the default settings here. I haven't modified the config to use mysql or postgres.

Grantor answered 20/1, 2020 at 10:53 Comment(3)
Hi Anees, Can you change logging level to DEBUG and retry the task and provide those logs, please? Also please let me know what Executor you use (Sequential, Local, Celery or Kubernetes) and what version of Airflow you use. And you DB Backend is Postgres or Mysql?Larochelle
@Larochelle I edited the question to include the information that you asked for!Grantor
@Larochelle fwiw, I tried with LocalExecutor and MYSQL backend. Still failed.Grantor
G
11

This is unrelated to airflow or tweepy.

This issue is specific to Mac OS High Sierra and above. https://mcmap.net/q/135170/-multiprocessing-causes-python-to-crash-and-gives-an-error-may-have-been-in-progress-in-another-thread-when-fork-was-called solved my issue.

Basically, airflow test merely runs the task in-process, but the scheduler would start a worker process which would call fork(), and apparently, High Sierra introduced some new security changes that's breaking fork() usages in python.

This also caused problems in ansible. Refer https://github.com/ansible/ansible/issues/32499#issuecomment-341578864

Grantor answered 20/1, 2020 at 17:16 Comment(0)
A
0

I have exatcly the same issue with different operator, sftp_to_s3_operator, Where airflow test works but with environment variable OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in my .zshrc or .bash_profile file doesn't work.

Aspia answered 20/5, 2020 at 18:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.