SQLAlchemy engine from Airflow database hook
Asked Answered
B

2

14

What's the best way to get a SQLAlchemy engine from an Airflow connection ID?

Currently I am creating a hook, retrieving its URI, then using it to create a SQLAlchemy engine.

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = create_engine(postgres_hook.get_uri())

This works but both commands make a connection to the database.

When I have "extra" parameters on the connection a third connection is needed to retrieve those (see Retrieve full connection URI from Airflow Postgres hook)

Is there a shorter and more direct method?

Burt answered 30/4, 2020 at 17:2 Comment(0)
U
17

To be clear, indeed your commands will make two database connections, but it's to two separate databases (unless you're trying to connect to your Postgres Airflow database). The first line of initializing the hook should not make any connections. Only the second line first grabs the connection details from the Airflow database (which I don't think you can avoid), then uses that to connect to the Postgres database (which I think is the point).

You can make slightly simpler though with:

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = postgres_hook.get_sqlalchemy_engine()

That seems pretty clean, but if you want to get even more direct without going through PostgresHook, you could fetch it directly by querying Airflow's database. However, that means you're going to end up duplicating the code to build a URI from the connection object. The underlying implementation of get_connection() is a good example if you want to proceed with this.

from airflow.settings import Session

conn = session.query(Connection).filter(Connection.conn_id == self.postgres_conn_id).one()
... # build uri from connection
create_engine(uri)

Additionally, if you want to be able to access the extras without a separate database fetch beyond what get_uri() or get_sqlalchemy_engine() does, is you can override BaseHook.get_connection() to save the connection object to an instance variable for reuse. This would require creating your own hook on top of PostgresHook, so I understand that may not be ideal.

class CustomPostgresHook(PostgresHook):

    @classmethod
    def get_connection(cls, conn_id):  # type: (str) -> Connection
        conn = super().get_connection(conn_id)
        self.conn_obj = conn  # can't use self.conn because PostgresHook will overriden in https://github.com/apache/airflow/blob/1.10.10/airflow/hooks/postgres_hook.py#L93 by a different type of connection
        return conn

postgres_hook = CustomPostgresHook(self.postgres_conn_id)
uri = postgres_hook.get_uri()
# do something with postgres_hook.conn_obj.extras_dejson

Some built in Airflow hooks have this behavior already (grpc, samba, tableau), but it's definitely not standardized.

Unhandsome answered 2/5, 2020 at 17:10 Comment(3)
get_sqlalchemy_engine is excellent, thank you. Is there a neater way to include any "extra" params from the connection? This is the subject of my linked questionBurt
Hi Daniel. Thanks for your complete answer. My question is whether we can have a connection pool on Postgres database using this Airflow Hook. Because, if we don't have connection pool, lots of concurrent tasks may incur a heavy load on pg database.Aviatrix
@Aviatrix you can solve the pooling problem with airflow task pools.Throaty
E
1

The accepted answer (get_sqlalchemy_engine()) is now partially incorrect due to SQLAlchemy no longer accepting the "postgres:/" prefix. It must be "postgresql:/"

According to this Issue - it effectively used to always work "by coincidence" not by design.

This is the function I use

def get_postgres_sqlalchemy_engine(self, hook, engine_kwargs=None):
        """
        Get an sqlalchemy_engine object.

        :param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`.
        :return: the created engine.
        """
        if engine_kwargs is None:
            engine_kwargs = {}
        conn_uri = hook.get_uri().replace("postgres:/", "postgresql:/")
        conn_uri = re.sub(r"\?.*$", "", conn_uri)
        return create_engine(conn_uri, **engine_kwargs)

https://github.com/apache/airflow/issues/34876

https://lists.apache.org/thread/8rhmz3qh30hvkondct4sfmgk4vd07mn5

Historically before DbApiHook was introduced some "DB-API 2" Hooks (I put in quotes because not all properly implements this API), implements this method in each hook separately, and some of them are not implemented at all.

Everything changed since DbApiHook introduced and now 23 Community Hooks based on it. DbApiHook implements it by interesting way.

def get_uri(self) -> str: """ Extract the URI from the connection.

:return: the extracted uri. """ conn = self.get_connection(getattr(self, self.conn_name_attr)) conn.schema = self.__schema or conn.schema return conn.get_uri()

And it use by default in

def get_sqlalchemy_engine(self, engine_kwargs=None): """ Get an sqlalchemy_engine object.

:param engine_kwargs: Kwargs used in :func:~sqlalchemy.create_engine. :return: the created engine. """ if engine_kwargs is None: engine_kwargs = {} return create_engine(self.get_uri(), **engine_kwargs)

IMO there are two bad assumptions:

  1. "You could transform the Airflow Connection URI to the SQLAlchemy URI". Many of the providers also have additional extra parameters which are not supported by SQLalchemy. In additional every changes in Airflow Connection URI format might breaks result even if it worked before: https://github.com/apache/airflow/issues/34456

  2. "All DB implements SQLALchemy Dialect ( https://docs.sqlalchemy.org/en/20/dialects/) and we have packages for it"

I've split Hooks by different groups

Group 1 (7 Hooks): In the first glance works fine but better if someone also could have a look

  • SnowflakeHook from snowflake provider
  • BigQueryHook from google provider
  • SpannerHook from google provider
  • RedshiftSQLHook from amazon provider
  • OdbcHook from odbc provider
  • DrillHook from apache.drill provider
  • DruidDbApiHook from apache.druid provider

Group 2 (10 Hooks): Do not overwrite DbApiHook.get_uri implementation

  • MySqlHook from mysql provider. In additional this hook provide support for two different drivers:
  • OracleHook from oracle provider
  • JdbcHook from jdbc provider
  • TrinoHook from trino provider
  • HiveServer2Hook from apache.hive provider
  • ImpalaHook from apache.impala provider
  • DatabricksSqlHook from databricks provider
  • ExasolHook from exasol provider
  • PrestoHook from presto provider
  • VerticaHook fromvertica` provider

Group 3 (3 Hooks): Uses Connection.get_uri for construct SQLalchemy URI

  • PostgresHook from postgres provider
  • MsSqlHook from microsoft.mssql provider
  • SqliteHook from sqlite provider

Group 4 (2 Hooks): Implementation which would not work with get_sqlalchemy_engine method

Group 5 (1 Hook): Hooks from suspended providers

  • QuboleCheckHook

I'm not sure what we can do in the current situation. My proposal:

  1. Add UserWarning in DbApiHook.get_uri and maybe in DbApiHook.get_sqlalchemy_engine which inform that this methods might not work correctly without re-implementation
  2. Implements get_uri without Connection.get_uri for Group 3
  3. Raise NotImplemented error in get_sqlalchemy_engine for Group 4

My hope for collective wisdom/mind and help in solving this puzzle by proposing other ideas.

---- Best Wishes Andrey Anshin

Eliathan answered 26/1 at 11:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.