Google Cloud Composer and Google Cloud SQL
Asked Answered
F

6

11

What ways do we have available to connect to a Google Cloud SQL (MySQL) instance from the newly introduced Google Cloud Composer? The intention is to get data from a Cloud SQL instance into BigQuery (perhaps with an intermediary step through Cloud Storage).

  1. Can the Cloud SQL proxy be exposed in some way on pods part the Kubernetes cluster hosting Composer?

  2. If not can the Cloud SQL Proxy be brought in by using the Kubernetes Service Broker? -> https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker

  3. Should Airflow be used to schedule and call GCP API commands like 1) export mysql table to cloud storage 2) read mysql export into bigquery?

  4. Perhaps there are other methods that I am missing to get this done

Foudroyant answered 3/5, 2018 at 11:45 Comment(1)
Raj did you finally find a way ? I'm also trying to do soNeuman
M
7

"The Cloud SQL Proxy provides secure access to your Cloud SQL Second Generation instances without having to whitelist IP addresses or configure SSL." -Google CloudSQL-Proxy Docs

CloudSQL Proxy seems to be the recommended way to connect to CloudSQL above all others. So in Composer, as of release 1.6.1, we can create a new Kubernetes Pod to run the gcr.io/cloudsql-docker/gce-proxy:latest image, expose it through a service, then create a Connection in Composer to use in the operator.

To get set up:

  • Follow Google's documentation

  • Test the connection using info from Arik's Medium Post

    • Check that the pod was created kubectl get pods --all-namespaces

    • Check that the service was created kubectl get services --all-namespaces

    • Jump into a worker node kubectl --namespace=composer-1-6-1-airflow-1-10-1-<some-uid> exec -it airflow-worker-<some-uid> bash

      • Test mysql connection mysql -u composer -p --host <service-name>.default.svc.cluster.local

Notes:

  • Composer now uses namespaces to organize pods

  • Pods in different namespaces don't talk to each other unless you give them the full path <k8-service-name>.<k8-namespace-name>.svc.cluster.local

  • Creating a new Composer Connection with the full path will enable successful connection

Mesentery answered 20/5, 2019 at 23:13 Comment(4)
Worked for me, the only thing I had to change is that the user 'composer' did not exist instead it was 'root' with no password.Jessiajessica
Linkt to google's documentation[github.com/GoogleCloudPlatform/cloudsql-proxy/blob/master/… is not workingBrody
Same here the the <k8-namespace-name> was "default" for me and the username was "root" and not "composer". My composer environment was composer-1-16-5-airflow-1-10-15Jamila
Thanks @Micah Miller This worked like a charm. The issue was with the namespace. CloudSqlProxy deployment and its services were running in default namespace where as the airflow-scheduler was in a different namespace & the connection succeeded after giving the full path <k8-service-name>.<k8-namespace-name>.svc.cluster.localLumen
C
4

We had the same problem but with a Postgres instance. This is what we did, and got it to work:

  • create a sqlproxy deployment in the Kubernetes cluster where airflow runs. This was a copy of the existing airflow-sqlproxy used by the default airflow_db connection with the following changes to the deployment file:

    • replace all instances of airflow-sqlproxy with the new proxy name
    • edit under 'spec: template: spec: containers: command: -instances', replace the existing instance name with the new instance we want to connect to
  • create a kubernetes service, again as a copy of the existing airflow-sqlproxy-service with the following changes:

    • replace all instances of airflow-sqlproxy with the new proxy name
    • under 'spec: ports', change to the appropriate port (we used 5432 for a Postgres instance)
  • in the airflow UI, add a connection of type Postgres with host set to the newly created service name.

Coelenterate answered 6/2, 2019 at 13:12 Comment(3)
where can I find the yaml of airflow-sqlproxy and serivce?Siobhan
Under Kubernetes Engine > ServicesCoelenterate
this is also they way that worked for me best. however, IMPORTANT REMARK: the current version of cloud composer runs its components in their own namespace. Make sure that you change the default namespace in the YAML to the one used by CC. Otherwise the DNS resolution will fail.Courses
A
2

You can follow these instructions to launch a new Cloud SQL proxy instance in the cluster.

re #3: That sounds like a good plan. There isn't a Cloud SQL to BigQuery operator to my knowledge, so you'd have to do it in two phases like you described.

Archaism answered 17/5, 2018 at 21:10 Comment(3)
This is how we connect to our (postgre) CloudSQL instance also.Cranston
I followed the instructions above and successfully connected to Cloud SQL using the MySQL CLI from the scheduler (ssh into it) but the pods that in the actual tasks fail to connect to it. They are scheduled in a node pool - should that make any difference?Siobhan
These instructions suggest launching a proxy in a "sidecar" pattern which is an issue if you have no control over the lanched pods (e.g. if using Kubernetes Operator). Check out this post for more into: medium.com/@ariklevliber/…Siobhan
R
1

Adding the medium post in the comments from @Leo to the top level https://medium.com/@ariklevliber/connecting-to-gcp-composer-tasks-to-cloud-sql-7566350c5f53 . Once you follow that article and have the service setup you can connect from your DAG using SQLAlchemy like this:

import os
from datetime import datetime, timedelta
import logging

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

logger = logging.getLogger(os.path.basename(__file__))
INSTANCE_CONNECTION_NAME = "phil-new:us-east1:phil-db"

default_args = {
    'start_date': datetime(2019, 7, 16)
}


def connect_to_cloud_sql():
    '''
        Create a connection to CloudSQL
    :return:
    '''
    import sqlalchemy
    try:
        PROXY_DB_URL = "mysql+pymysql://<user>:<password>@<cluster_ip>:3306/<dbname>"
        logger.info("DB URL", PROXY_DB_URL)
        engine = sqlalchemy.create_engine(PROXY_DB_URL, echo=True)
        for result in engine.execute("SELECT NOW() as now"):
            logger.info(dict(result))
    except Exception:
        logger.exception("Unable to interact with CloudSQL")


dag = DAG(
    dag_id="example_sqlalchemy",
    default_args=default_args,
    # schedule_interval=timedelta(minutes=5),
    catchup=False  # If you don't set this then the dag will run according to start date
)


t1 = PythonOperator(
    task_id="example_sqlalchemy",
    python_callable=connect_to_cloud_sql,
    dag=dag
)


if __name__ == "__main__":
    connect_to_cloud_sql()
Robledo answered 19/7, 2019 at 12:39 Comment(0)
P
0

Here, in Hoffa's answer to a similar question, you can find a reference on how Wepay keeps it synchronized every 15 minutes using an Airflow operator.

From said answer:

Take a look at how WePay does this:

The MySQL to GCS operator executes a SELECT query against a MySQL table. The SELECT pulls all data greater than (or equal to) the last high watermark. The high watermark is either the primary key of the table (if the table is append-only), or a modification timestamp column (if the table receives updates). Again, the SELECT statement also goes back a bit in time (or rows) to catch potentially dropped rows from the last query (due to the issues mentioned above).

With Airflow they manage to keep BigQuery synchronized to their MySQL database every 15 minutes.

Petaloid answered 18/6, 2018 at 7:29 Comment(2)
While this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes. - From ReviewSwale
I have added the original text of the answer, I hope this makes it better.Coddle
T
0

Now we can connect to Cloud SQL without creating a cloud proxy ourselves. The operator will create it automatically. The code look like this:

from airflow.models import DAG
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator

export_body = {
    'exportContext': {
        'fileType': 'CSV',
        'uri': EXPORT_URI,
        'databases': [DB_NAME],
        'csvExportOptions': {
            'selectQuery': SQL
        }
    }
}

default_dag_args = {}

with DAG(
        'postgres_test',
        schedule_interval='@once',
        default_args=default_dag_args) as dag:

    sql_export_task = CloudSqlInstanceExportOperator(
        project_id=GCP_PROJECT_ID,
        body=export_body,
        instance=INSTANCE_NAME,
        task_id='sql_export_task'
    )
Transfer answered 24/10, 2019 at 4:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.