I'm trying to configure Airflow on my laptop for the first time (without using docker, just following documentation). My goal is to set up a simple ETL job.
I've written the simplest possible DAG with one PythonOperator:
from datetime import timedelta
from view import spotify_etl
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
'airflow_dag_tutorial-new',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
run_etl = PythonOperator(
task_id='main_task',
python_callable=spotify_etl,
dag=dag,
)
run_etl
When I pass a dummy function with a print statement, the DAG runs successfully. But then, when I pass my function spotify_etl that calls Spotify API, the DAG fails. This is the function:
def spotify_etl():
token = 'xxx'
headers = {
'Accept' : "application/json",
'Content-Type': "application/json",
'Authorization': 'Bearer {token}'.format(token=token)
}
today = datetime.datetime.now()
yesterday = today - datetime.timedelta(days=100)
yesterday_unix_timestamp = int(yesterday.timestamp()) *1000
r = requests.get("https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp), headers=headers)
data = r.json()
print(data)
The error I get is:
[2020-11-08 12:35:23,453] {local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGABRT
Does anyone know how to use PythonOperator correctly for a function that calls API? What is causing this error?
I tried setting: export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES in my venv (as suggested here: Airflow task running tweepy exits with return code -6 and here: https://github.com/ansible/ansible/issues/32499#issuecomment-341578864) but that doesn't seem to have fix it.