Airflow DAG fails when PythonOperator tries to call API and download data
Asked Answered
S

1

3

I'm trying to configure Airflow on my laptop for the first time (without using docker, just following documentation). My goal is to set up a simple ETL job.

I've written the simplest possible DAG with one PythonOperator:

from datetime import timedelta
from view import spotify_etl
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),

}
dag = DAG(
    'airflow_dag_tutorial-new',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)


run_etl = PythonOperator(
        task_id='main_task',
        python_callable=spotify_etl,
        dag=dag,
    )

run_etl

When I pass a dummy function with a print statement, the DAG runs successfully. But then, when I pass my function spotify_etl that calls Spotify API, the DAG fails. This is the function:

def spotify_etl():
    token = 'xxx'

    headers = {
    'Accept' : "application/json",
    'Content-Type': "application/json",
    'Authorization': 'Bearer {token}'.format(token=token)    
    }  


    today = datetime.datetime.now()
    yesterday = today - datetime.timedelta(days=100)
    yesterday_unix_timestamp = int(yesterday.timestamp()) *1000


    r = requests.get("https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp), headers=headers)
    data = r.json()
    print(data)

The error I get is:

[2020-11-08 12:35:23,453] {local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGABRT

Does anyone know how to use PythonOperator correctly for a function that calls API? What is causing this error?

I tried setting: export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in my venv (as suggested here: Airflow task running tweepy exits with return code -6 and here: https://github.com/ansible/ansible/issues/32499#issuecomment-341578864) but that doesn't seem to have fix it.

Sizzle answered 8/11, 2020 at 13:0 Comment(2)
Did you solve this? @arcticAmphithecium
Yes, see the answer belowSizzle
S
0

It turned out that the "export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES" was not set correctly. It had to be added to .zshrc instead of .bash_profile. That solved it.

Sizzle answered 8/11, 2020 at 16:55 Comment(3)
@artic.queenolina Hey I'm experiencing this same issue, trying to make an API call from a python operator. What file specifically do you set "export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES" ? My current environment is AnacondaGull
related: #73582793 os.environ["no_proxy"] = "*"Aynat
That didn't work for me, I tried setting this OBJC env var in a dozen different ways, I always get the SIGABRT signal for some reason...Yawn

© 2022 - 2024 — McMap. All rights reserved.