To be clear, indeed your commands will make two database connections, but it's to two separate databases (unless you're trying to connect to your Postgres Airflow database). The first line of initializing the hook should not make any connections. Only the second line first grabs the connection details from the Airflow database (which I don't think you can avoid), then uses that to connect to the Postgres database (which I think is the point).
You can make slightly simpler though with:
postgres_hook = PostgresHook(self.postgres_conn_id)
engine = postgres_hook.get_sqlalchemy_engine()
That seems pretty clean, but if you want to get even more direct without going through PostgresHook
, you could fetch it directly by querying Airflow's database. However, that means you're going to end up duplicating the code to build a URI from the connection object. The underlying implementation of get_connection() is a good example if you want to proceed with this.
from airflow.settings import Session
conn = session.query(Connection).filter(Connection.conn_id == self.postgres_conn_id).one()
... # build uri from connection
create_engine(uri)
Additionally, if you want to be able to access the extras
without a separate database fetch beyond what get_uri()
or get_sqlalchemy_engine()
does, is you can override BaseHook.get_connection() to save the connection object to an instance variable for reuse. This would require creating your own hook on top of PostgresHook
, so I understand that may not be ideal.
class CustomPostgresHook(PostgresHook):
@classmethod
def get_connection(cls, conn_id): # type: (str) -> Connection
conn = super().get_connection(conn_id)
self.conn_obj = conn # can't use self.conn because PostgresHook will overriden in https://github.com/apache/airflow/blob/1.10.10/airflow/hooks/postgres_hook.py#L93 by a different type of connection
return conn
postgres_hook = CustomPostgresHook(self.postgres_conn_id)
uri = postgres_hook.get_uri()
# do something with postgres_hook.conn_obj.extras_dejson
Some built in Airflow hooks have this behavior already (grpc, samba, tableau), but it's definitely not standardized.
get_sqlalchemy_engine
is excellent, thank you. Is there a neater way to include any "extra" params from the connection? This is the subject of my linked question – Burt