Airflow s3 connection using UI
Asked Answered
G

8

64

I've been trying to use Airflow to schedule a DAG. One of the DAG includes a task which loads data from s3 bucket.

For the purpose above I need to setup s3 connection. But UI provided by airflow isn't that intutive (http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections). Any one succeeded setting up the s3 connection if so are there any best practices you folks follow?

Thanks.

Grampositive answered 12/10, 2016 at 11:41 Comment(1)
link is broken.Glee
C
106

EDIT: This answer stores your secret key in plain text which can be a security risk and is not recommended. The best way is to put access key and secret key in the login/password fields, as mentioned in other answers below. END EDIT

It's hard to find references, but after digging a bit I was able to make it work.

TLDR

Create a new connection with the following attributes:

Conn Id: my_conn_S3

Conn Type: S3

Extra:

{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}

Long version, setting up UI connection:

  • On Airflow UI, go to Admin > Connections
  • Create a new connection with the following attributes:
  • Conn Id: my_conn_S3
  • Conn Type: S3
  • Extra: {"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
  • Leave all the other fields (Host, Schema, Login) blank.

To use this connection, below you can find a simple S3 Sensor Test. The idea of this test is to set up a sensor that watches files in S3 (T1 task) and once below condition is satisfied it triggers a bash command (T2 task).

Testing

  • Before running the DAG, ensure you've an S3 bucket named 'S3-Bucket-To-Watch'.
  • Add below s3_dag_test.py to airflow dags folder (~/airflow/dags)
  • Start airflow webserver.
  • Go to Airflow UI (http://localhost:8383/)
  • Start airflow scheduler.
  • Turn on 's3_dag_test' DAG on the main DAGs view.
  • Select 's3_dag_test' to show the dag details.
  • On the Graph View you should be able to see it's current state.
  • 'check_s3_for_file_in_s3' task should be active and running.
  • Now, add a file named 'file-to-watch-1' to your 'S3-Bucket-To-Watch'.
  • First tasks should have been completed, second should be started and finish.

The schedule_interval in the dag definition is set to '@once', to facilitate debugging.

To run it again, leave everything as it's, remove files in the bucket and try again by selecting the first task (in the graph view) and selecting 'Clear' all 'Past','Future','Upstream','Downstream' .... activity. This should kick off the DAG again.

Let me know how it went.

s3_dag_test.py ;

"""
S3 Sensor Connection Test
"""

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 11, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')

t1 = BashOperator(
    task_id='bash_test',
    bash_command='echo "hello, it should work" > s3_conn_test.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='check_s3_for_file_in_s3',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='S3-Bucket-To-Watch',
    s3_conn_id='my_conn_S3',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)

Main References:
Cragsman answered 23/11, 2016 at 21:22 Comment(11)
Thank you so much, definitely helped meGrampositive
Don't use dots in your bucket name it won't work a known issue with boto.Detestation
Thanks this was helpful. In version 1.8.1+ the imports have changed, e.g. use from airflow.operators.bash_operator import BashOperator and from airflow.operators.sensors import s3KeySensor I also tried to find the file s3_conn_test.txt on the server and it wasn't there. I checked the logs and it looks like the scripts run in some subdirectory of /tmp/ which is subsequently deleted when the task finishes, so it might be better to write to an explicit path that the airflow user has permission to.Careerist
On 1.9 I'm getting Failed to create record. Incorrect padding when i do thisAnnabal
@Careerist it's a capital S not a lower case s for S3KeySensor.Inactive
To clear up any confusion for others the Conn Id: my_conn_S3 would just be Conn Id: s3://the_name_of_your_bucket and if your Airflow is on an ec2-instance then you don't need to fill out the extra part with the access_key_id and secret_access_key since ec2 handles that internally. Also, if you want to sense ANY file just make bucket_key='*' instead of bucket_key='file-to-watch-*'. And last but not least you can find the documentation for S3KeySensor here airflow.apache.org/code.htmlInactive
PendingDeprecationWarning: Invalid arguments were passed to S3KeySensor. Support for passing such arguments will be dropped in Airflow 2.0.Boyce
@KyleBridenstine I'm running airflow on EC2 machine and it writes : botocore.exceptions.NoCredentialsError: Unable to locate credentialsBoyce
Can you run aws cli commands on the ec2-instance? SSH onto your ec2-instance and try running an S3 copy command to the S3 bucket you are trying to hit and don't give it the aws credentials in the command (see if the ec2-instance can find them) so run like "touch foobar.txt" then "aws s3 cp foobar.txt s3://mybucket/foobar.txt" and if it says can't find credentials then you don't have your ec2-instance setup properly with it's credentials. You probably just need to give your ec2-instance access to S3.Inactive
I have found a tuto recently about that topic blog.sicara.com/…Mcanally
I wish Anselmo would edit this answer since this is not the right approach anymore. This exposes the secret key/password in plain text. See @Ash's answers belowUntread
Q
22

Assuming airflow is hosted on an EC2 server.

just create the connection as per other answers but leave everything blank in the configuration apart from connection type which should stay as S3

The S3hook will default to boto and this will default to the role of the EC2 server you are running airflow on. assuming this role has rights to S3 your task will be able to access the bucket.

this is a much safer way than using and storing credentials.

Queri answered 5/9, 2017 at 5:16 Comment(2)
One obvious drawback is that you might not want to use a single role though, right?Poesy
this saved me big time! thanks a bunch for this comment. We use MFA and I am pretty sure MFA was messing up our authentication, and we were getting AccessDenied for PutObject. If anyone has any ideas about how to make it work when MFA is required, let me know.Gambeson
C
17

If you are worried about exposing the credentials in the UI, another way is to pass credential file location in the Extra param in UI. Only the functional user has read privileges to the file. It looks something like below

Extra:  {
    "profile": "<profile_name>", 
    "s3_config_file": "/home/<functional_user>/creds/s3_credentials", 
    "s3_config_format": "aws" }

file "/home/<functional_user>/creds/s3_credentials" has below entries

[<profile_name>]
aws_access_key_id = <access_key_id>
aws_secret_access_key = <secret_key>
Carditis answered 1/2, 2017 at 16:43 Comment(0)
L
15

Another option that worked for me was to put the access key as the "login" and the secret key as the "password":

Conn Id: <arbitrary_conn_id>
Conn Type: S3
Login: <aws_access_key>
Password: <aws_secret_key>

Leave all other fields blank.

Liberati answered 22/9, 2018 at 14:53 Comment(0)
S
10

We've added this to our docs a few versions ago:

http://airflow.apache.org/docs/stable/howto/connection/aws.html

There is no difference between an AWS connection and an S3 connection.

The accepted answer here has key and secret in the extra/JSON, and while that still works (as of 1.10.10) it is not recommended anymore as it displays the secret in plain text in the UI.

Serranid answered 24/5, 2020 at 17:29 Comment(1)
could you provide a comment to the answer I posted (to my own question) here: #69080416 In short: Adding the host and port through airflow's UI did not work for me. Had to add them to the extras field.Raskind
B
5

For the new version, change the python code on above sample.

s3_conn_id='my_conn_S3'

to

aws_conn_id='my_conn_s3'
Briarwood answered 15/2, 2018 at 6:5 Comment(1)
What is the Conn Id: my_conn_S3? Is that just like s3://name_of_my_bucket? And for bucket_name='S3-Bucket-To-Watch' what if you don't know the name of the file and just want this to sense any new file added?Inactive
M
1
Conn Id: example_s3_connnection
Conn Type: S3
Extra:{"aws_access_key_id":"xxxxxxxxxx", "aws_secret_access_key": "yyyyyyyyyyy"}

Note: Login and Password fields are left empty.

Moneymaking answered 12/11, 2018 at 15:13 Comment(1)
Warning - this will have your secret key available in plaintext and can be a security issue! use the answer by @Liberati above using the login/pwUntread
S
0

For aws in China, It don't work on airflow==1.8.0 need update to 1.9.0 but airflow 1.9.0 change name to apache-airflow==1.9.0

Seismic answered 6/3, 2018 at 6:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.