Airflow: How to SSH and run BashOperator from a different server
Asked Answered
M

4

38

Is there a way to ssh to different server and run BashOperator using Airbnb's Airflow? I am trying to run a hive sql command with Airflow but I need to SSH to a different box in order to run the hive shell. My tasks should look like this:

  1. SSH to server1
  2. start Hive shell
  3. run Hive command

Thanks!

Mohammed answered 12/9, 2016 at 19:38 Comment(0)
M
49

NOT available for airflow 2.x.

I think that I just figured it out:

  1. Create a SSH connection in UI under Admin > Connection. Note: the connection will be deleted if you reset the database

  2. In the Python file add the following

     from airflow.contrib.hooks import SSHHook
     sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
    
  3. Add the SSH operator task

     t1 = SSHExecuteOperator(
         task_id="task1",
         bash_command=<YOUR COMMAND>,
         ssh_hook=sshHook,
         dag=dag)
    

Thanks!

Mohammed answered 14/9, 2016 at 15:29 Comment(7)
Note you also have to import the operator: from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperatorHierarchize
With latest airflow version 1.10 SSHExecuteOperator is deprecated and new SSHOperator has to be used. If anyone is using 1.10 then new import should be from airflow.contrib.hooks.ssh_hook import SSHHook and from airflow.contrib.operators.ssh_operator import SSHOperator.Quant
what parameters we required for creating a SSH connection in airflow variable ?Hoedown
@Quant the SSHOperator is broken: 'NoneType' object has no attribute 'startswith'. Where can I find SSHExecuteOperator? Is it still available?Tollmann
@Tollmann you are getting that because you are using bash_command in the params. Use "command" instead of "bash_command" and you won;t get the error. Using bash_command with make the command attribute as None and result in error.Altercate
@ferris Create a SSH connection in UI under Admin > Connection ....could you please share screenshots for this steps also coz I am badly stuck and getting error [Errno 2] No such file or directory: '20-march.pem'....I have passed Extra : {"key_file": "20-march.pem"}Heterogeneous
Hi @AshishKarpe , you can see my demo code I just added.Signatory
N
31

One thing to note with Anton's answer is that the argument is actually ssh_conn_id, not conn_id for the SSHOperator object. At least in version 1.10.

A quick example would look like

from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': datetime.now() - timedelta(minutes=20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(dag_id='testing_stuff',
          default_args=default_args,
          schedule_interval='0,10,20,30,40,50 * * * *',
          dagrun_timeout=timedelta(seconds=120))
# Step 1 - Dump data from postgres databases
t1_bash = """
echo 'Hello World'
"""
t1 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='test_ssh_operator',
    command=t1_bash,
    dag=dag)
Norword answered 15/11, 2018 at 21:43 Comment(6)
this should be the answer for apache airflow 1.10Zimbabwe
nicely done. LOL@ # Step 1 - Dump data from postgres databasesGensler
BTW, where can I find ssh_conn_id or just name it randomly?Jocelin
ssh_conn_id is the name you type when you create the connection in Airflow UI or an existing one under Admin -> ConnectionsHoptoad
what parameters we required for creating a SSH connection in airflow variable ?Hoedown
I got this error " 'NoneType' object has no attribute 'startswith' "Should
C
11

Here is a working example with the ssh operator in Airflow 2:

[BEWARE: the output of this operator is base64 encoded]

from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
sshHook = SSHHook(ssh_conn_id="conn-id", key_file='/opt/airflow/keys/ssh.key')
# a hook can also be defined directly in the code:
# sshHook = SSHHook(remote_host='server.com', username='admin', key_file='/opt/airflow/keys/ssh.key')

ls = SSHOperator(
        task_id="ls",
        command= "ls -l",
        ssh_hook = sshHook,
        dag = dag)

The conn-id is the one set in the Admin -> Connections. The key_file is the private ssh key.

Corbeil answered 9/6, 2021 at 11:15 Comment(1)
Hi @artBcode the 'key_file` is it the public key of my airflow machine?Calfee
S
1

Demo for Airflow 2.X:

First, create a Connection URI

# refer doc: https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
# in airflow HOST, run bash cmd:
"""
    airflow connections add 'ssh_dt17' --conn-uri 'ssh://[username]:[password]@192.168.1.17'
"""

Creating a Connection from the CLI

Second, Demo dag code:


from airflow.decorators import dag, task
# from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, get_current_context
from airflow.operators.dummy import DummyOperator

from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook


@dag(
    default_args=default_args,
    description='demo',
    schedule_interval=None,
    start_date=datetime(2022, 9, 20, tzinfo=tz),
    catchup = False, 
    max_active_tasks = 1,

)
def demo_run_ssh_remote_cmd():
    
    # ssh_conn_id
    ################################################################
    ssh_dt17 = SSHHook(ssh_conn_id='ssh_dt17', remote_host='192.168.1.17')

    # dt17 ssh run remote cmd
    ################################################################    
    cmd_logrotate =(r'''
        /usr/sbin/logrotate -v -f /etc/logrotate.d/access_log_8am_8pm
    ''')

    logrotate_ad = SSHOperator(
        task_id='logrotate_ad',
        command=cmd_logrotate,
        ssh_hook=ssh_dt17,
        max_active_tis_per_dag=1,  
        cmd_timeout = 60*5,       

        # trigger_rule="none_failed",
    )

    # =============================================================================================
    start = DummyOperator(task_id="start")
    start >> logrotate_ad 


_ = demo_run_ssh_remote_cmd()  
Signatory answered 8/5, 2023 at 7:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.