Start CloudSQL Proxy on Python Dataflow / Apache Beam
Asked Answered
L

3

10

I am currently working on a ETL Dataflow job (using the Apache Beam Python SDK) which queries data from CloudSQL (with psycopg2 and a custom ParDo) and writes it to BigQuery. My goal is to create a Dataflow template which I can start from a AppEngine using a Cron job.

I have a version which works locally using the DirectRunner. For that I use the CloudSQL (Postgres) proxy client so that I can connect to the database on 127.0.0.1 .

When using the DataflowRunner with custom commands to start the proxy within a setup.py script, the job won't execute. It stucks with repeating this log-message:

Setting node annotation to enable volume controller attach/detach

A part of my setup.py looks the following:

CUSTOM_COMMANDS = [
['echo', 'Custom command worked!'],
['wget', 'https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64', '-O', 'cloud_sql_proxy'],
['echo', 'Proxy downloaded'],
['chmod', '+x', 'cloud_sql_proxy']]

class CustomCommands(setuptools.Command):
  """A setuptools Command class able to run arbitrary commands."""

  def initialize_options(self):
    pass

  def finalize_options(self):
    pass

  def RunCustomCommand(self, command_list):
    print('Running command: %s' % command_list)
    logging.info("Running custom commands")
    p = subprocess.Popen(
        command_list,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # Can use communicate(input='y\n'.encode()) if the command run requires
    # some confirmation.
    stdout_data, _ = p.communicate()
    print('Command output: %s' % stdout_data)
    if p.returncode != 0:
      raise RuntimeError(
          'Command %s failed: exit code: %s' % (command_list, p.returncode))

  def run(self):
    for command in CUSTOM_COMMANDS:
      self.RunCustomCommand(command)
    subprocess.Popen(['./cloud_sql_proxy', '-instances=bi-test-1:europe-west1:test-animal=tcp:5432'])

I added the last line as separate subprocess.Popen() within run() after reading this issue on Github from sthomp and this discussion on Stackoverflo. I also tried to play around with some parameters of subprocess.Popen.

Another mentioned solution from brodin was to allow access from every IP address and to connect via username and password. In my understanding he does not claim this as best practice.

Thank you in advance for you help.

!!! Workaround solution at bottom of this post !!!


Update - Logfiles

These are the logs on error level which occur during a job:

E  EXT4-fs (dm-0): couldn't mount as ext3 due to feature incompatibilities 
E  Image garbage collection failed once. Stats initialization may not have completed yet: unable to find data for container / 
E  Failed to check if disk space is available for the runtime: failed to get fs info for "runtime": unable to find data for container / 
E  Failed to check if disk space is available on the root partition: failed to get fs info for "root": unable to find data for container / 
E  [ContainerManager]: Fail to get rootfs information unable to find data for container / 
E  Could not find capacity information for resource storage.kubernetes.io/scratch 
E  debconf: delaying package configuration, since apt-utils is not installed 
E    % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current 
E                                   Dload  Upload   Total   Spent    Left  Speed 
E  
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  3698  100  3698    0     0  25674      0 --:--:-- --:--:-- --:--:-- 25860 



#-- HERE IS WHEN setup.py FOR MY JOB IS EXECUTED ---

E  debconf: delaying package configuration, since apt-utils is not installed 
E  insserv: warning: current start runlevel(s) (empty) of script `stackdriver-extractor' overrides LSB defaults (2 3 4 5). 
E  insserv: warning: current stop runlevel(s) (0 1 2 3 4 5 6) of script `stackdriver-extractor' overrides LSB defaults (0 1 6). 
E  option = Interval; value = 60.000000; 
E  option = FQDNLookup; value = false; 
E  Created new plugin context. 
E  option = PIDFile; value = /var/run/stackdriver-agent.pid; 
E  option = Interval; value = 60.000000; 
E  option = FQDNLookup; value = false; 
E  Created new plugin context. 

Here you can find are all logs after the start of my custom setup.py (log-level: any; all logs):

https://jpst.it/1gk2Z

Update logfiles 2

Job logs (I manually canceled the job after not stucking for a while):

 2018-06-08 (08:02:20) Autoscaling is enabled for job 2018-06-07_23_02_20-5917188751755240698. The number of workers will b...
 2018-06-08 (08:02:20) Autoscaling was automatically enabled for job 2018-06-07_23_02_20-5917188751755240698.
 2018-06-08 (08:02:24) Checking required Cloud APIs are enabled.
 2018-06-08 (08:02:24) Checking permissions granted to controller Service Account.
 2018-06-08 (08:02:25) Worker configuration: n1-standard-1 in europe-west1-b.
 2018-06-08 (08:02:25) Expanding CoGroupByKey operations into optimizable parts.
 2018-06-08 (08:02:25) Combiner lifting skipped for step Save new watermarks/Write/WriteImpl/GroupByKey: GroupByKey not fol...
 2018-06-08 (08:02:25) Combiner lifting skipped for step Group watermarks: GroupByKey not followed by a combiner.
 2018-06-08 (08:02:25) Expanding GroupByKey operations into optimizable parts.
 2018-06-08 (08:02:26) Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
 2018-06-08 (08:02:26) Annotating graph with Autotuner information.
 2018-06-08 (08:02:26) Fusing adjacent ParDo, Read, Write, and Flatten operations
 2018-06-08 (08:02:26) Fusing consumer Get rows from CloudSQL tables into Begin pipeline with watermarks/Read
 2018-06-08 (08:02:26) Fusing consumer Group watermarks/Write into Group watermarks/Reify
 2018-06-08 (08:02:26) Fusing consumer Group watermarks/GroupByWindow into Group watermarks/Read
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/WriteBundles/WriteBundles into Save new watermar...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/GroupByWindow into Save new watermark...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/Reify into Save new watermarks/Write/...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/GroupByKey/Write into Save new watermarks/Write/...
 2018-06-08 (08:02:26) Fusing consumer Write to BQ into Get rows from CloudSQL tables
 2018-06-08 (08:02:26) Fusing consumer Group watermarks/Reify into Write to BQ
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/Map(<lambda at iobase.py:926>) into Convert dict...
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/WindowInto(WindowIntoFn) into Save new watermark...
 2018-06-08 (08:02:26) Fusing consumer Convert dictionary list to single dictionary and json into Remove "watermark" label
 2018-06-08 (08:02:26) Fusing consumer Remove "watermark" label into Group watermarks/GroupByWindow
 2018-06-08 (08:02:26) Fusing consumer Save new watermarks/Write/WriteImpl/InitializeWrite into Save new watermarks/Write/W...
 2018-06-08 (08:02:26) Workflow config is missing a default resource spec.
 2018-06-08 (08:02:26) Adding StepResource setup and teardown to workflow graph.
 2018-06-08 (08:02:26) Adding workflow start and stop steps.
 2018-06-08 (08:02:26) Assigning stage ids.
 2018-06-08 (08:02:26) Executing wait step start25
 2018-06-08 (08:02:26) Executing operation Save new watermarks/Write/WriteImpl/DoOnce/Read+Save new watermarks/Write/WriteI...
 2018-06-08 (08:02:26) Executing operation Save new watermarks/Write/WriteImpl/GroupByKey/Create
 2018-06-08 (08:02:26) Starting worker pool setup.
 2018-06-08 (08:02:26) Executing operation Group watermarks/Create
 2018-06-08 (08:02:26) Starting 1 workers in europe-west1-b...
 2018-06-08 (08:02:27) Value "Group watermarks/Session" materialized.
 2018-06-08 (08:02:27) Value "Save new watermarks/Write/WriteImpl/GroupByKey/Session" materialized.
 2018-06-08 (08:02:27) Executing operation Begin pipeline with watermarks/Read+Get rows from CloudSQL tables+Write to BQ+Gr...
 2018-06-08 (08:02:36) Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently runnin...
 2018-06-08 (08:02:46) Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently runnin...
 2018-06-08 (08:03:05) Workers have started successfully.
 2018-06-08 (08:11:37) Cancel request is committed for workflow job: 2018-06-07_23_02_20-5917188751755240698.
 2018-06-08 (08:11:38) Cleaning up.
 2018-06-08 (08:11:38) Starting worker pool teardown.
 2018-06-08 (08:11:38) Stopping worker pool...
 2018-06-08 (08:12:30) Autoscaling: Reduced the number of workers to 0 based on the rate of progress in the currently runni...

Stack Traces:

No errors have been received in this time period.

Update: Workaround Solution can be found in my answer below

Leaseholder answered 5/6, 2018 at 13:46 Comment(11)
Could you provide us with the complete log and what the actual error is? Because just from Setting node annotation to enable volume controller attach/detach there's not much we can see what is happening and why.Croupier
@Croupier thank you for your comment! Is there an appropriate way to provide you with the log files? The worker itself does not show any logs, yet (maybe because not started). I cannot post all logs of system, kubelet, etc. here because they are too long.Leaseholder
I would need you to provide me the logs of the Dataflow job that's failing. You can find them in the job logs https://console.cloud.google.com/dataflow?jobsDetail/locations/<ZONE>/jobs/<JOB_ID>?project=<PROJECT_NAME>. There should be some errors which should tell us what is going on. You don't have to post all the logs (just the most relative ones). If ther's too much you can use [justPasteIt ](justpaste.it) tool to share them here.Croupier
Updated post with logfiles (thanks for the tip with justpaste.it). I copied the logs from Logs Viewer. Unfortunately always landed at job list when using your link above with my specifications.Leaseholder
Thank you for that, but that's not really what I was after. Please post the dataflow logs. Sorry for that link, this one should be the right one: https://console.cloud.google.com/dataflow/jobsDetail/locations/<ZONE>/jobs/<JOB_ID>?project=<PROJECT_NAME>. Find logs of that job here and provide the stack trace.Croupier
Thanks for the feedback. Added job logs and Stack Traces (which did not show any errors). Do you have any idea?Leaseholder
Taking a closer look at what you are trying to achieve here I released that it's not possible at the time because Apache beam [does not offer a connector](beam.apache.org/documentation/io/built-in ) for python. For a workaround you can take a look at [this](#46528843 ) SO case. However my suggestion is to address this issue hereCroupier
Thanks. My workaround used to be a custom ParDo where I use psycopg2 with the CloudSQL proxy. But my only problem is that the proxy won't work on Dataflow (it does though on my local machine). You don't have an idea how to get the CloudSQL proxy working within Dataflow?Leaseholder
Right now, it is not possible to use Cloud SQL proxy with Dataflow. I would go ahead and follow @komarkovich's advice and address the issue on GitHub. In the meantime, you can go ahead and follow his suggestions for a workaround.Aseptic
Updated my post with current workaround solution (using connection via IP with SSL certificates). Thanks everyone for their help so far. Please feel free to comment on the current workaroundLeaseholder
@ThomasSchmidt, since you have found a workaround for your issue, could you kindly post it as an answer and accept it, so that other users can benefit from it? Thank you.Aseptic
L
9

Workaround Solution:

I finally found a workaround. I took the idea to connect via the public IP of the CloudSQL instance. For that you needed to allow connections to your CloudSQL instance from every IP:

  1. Go to the overview page of your CloudSQL instance in GCP
  2. Click on the Authorization tab
  3. Click on Add network and add 0.0.0.0/0 (!! this will allow every IP address to connect to your instance !!)

To add security to the process, I used SSL keys and only allowed SSL connections to the instance:

  1. Click on SSL tab
  2. Click on Create a new certificate to create a SSL certificate for your server
  3. Click on Create a client certificate to create a SSL certificate for you client
  4. Click on Allow only SSL connections to reject all none SSL connection attempts

After that I stored the certificates in a Google Cloud Storage bucket and load them before connecting within the Dataflow job, i.e.:

import psycopg2
import psycopg2.extensions
import os
import stat
from google.cloud import storage

# Function to wait for open connection when processing parallel
def wait(conn):
    while 1:
        state = conn.poll()
        if state == psycopg2.extensions.POLL_OK:
            break
        elif state == psycopg2.extensions.POLL_WRITE:
            pass
            select.select([], [conn.fileno()], [])
        elif state == psycopg2.extensions.POLL_READ:
            pass
            select.select([conn.fileno()], [], [])
        else:
            raise psycopg2.OperationalError("poll() returned %s" % state)

# Function which returns a connection which can be used for queries
def connect_to_db(host, hostaddr, dbname, user, password, sslmode = 'verify-full'):

    # Get keys from GCS
    client = storage.Client()

    bucket = client.get_bucket(<YOUR_BUCKET_NAME>)

    bucket.get_blob('PATH_TO/server-ca.pem').download_to_filename('server-ca.pem')
    bucket.get_blob('PATH_TO/client-key.pem').download_to_filename('client-key.pem')
    os.chmod("client-key.pem", stat.S_IRWXU)
    bucket.get_blob('PATH_TO/client-cert.pem').download_to_filename('client-cert.pem')

    sslrootcert = 'server-ca.pem'
    sslkey = 'client-key.pem'
    sslcert = 'client-cert.pem'

    con = psycopg2.connect(
        host = host,
        hostaddr = hostaddr,
        dbname = dbname,
        user = user,
        password = password,
        sslmode=sslmode,
        sslrootcert = sslrootcert,
        sslcert = sslcert,
        sslkey = sslkey)
    return con

I then use these functions in a custom ParDo to perform queries.
Minimal example:

import apache_beam as beam

class ReadSQLTableNames(beam.DoFn):
    '''
    parDo class to get all table names of a given cloudSQL database.
    It will return each table name.
    '''
    def __init__(self, host, hostaddr, dbname, username, password):
        super(ReadSQLTableNames, self).__init__()
        self.host = host
        self.hostaddr = hostaddr
        self.dbname = dbname
        self.username = username
        self.password = password

    def process(self, element):

        # Connect do database
        con = connect_to_db(host = self.host,
            hostaddr = self.hostaddr,
            dbname = self.dbname,
            user = self.username,
            password = self.password)
        # Wait for free connection
        wait_select(con)
        # Create cursor to query data
        cur = con.cursor(cursor_factory=RealDictCursor)

        # Get all table names
        cur.execute(
        """
        SELECT
        tablename as table
        FROM pg_tables
        WHERE schemaname = 'public'
        """
        )
        table_names = cur.fetchall()

        cur.close()
        con.close()
        for table_name in table_names:
            yield table_name["table"]

A part of the pipeline then could look like this:

# Current workaround to query all tables: 
# Create a dummy initiator PCollection with one element
init = p        |'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])

tables = init   |'Get table names' >> beam.ParDo(ReadSQLTableNames(
                                                host = known_args.host,
                                                hostaddr = known_args.hostaddr,
                                                dbname = known_args.db_name,
                                                username = known_args.user,
                                                password = known_args.password))

I hope this solution helps others with similar problems

Leaseholder answered 13/6, 2018 at 8:10 Comment(4)
Does this method ensure that GCS's default encryption is retained in transit while the certificates are being downloaded to the Dataflow job? @CroupierDhow
so is not possible to do it with the setup.py file and the proxy configuration?Faber
@IoT I did not find a solution for the proxy yet. I hope there will be a nice way in the future because I recently got some problems with my work around. Sometimes the downloaded file is empty and I needed to add some checks and retriesLeaseholder
Thanks @ThomasSchmidt. I hope Google work harder because is too far from the other two main cloud companiesFaber
F
6

I managed to find better or at least easier solution. In DoFn setup function use cloud proxy to setup pre connection

class MyDoFn(beam.DoFn):
 def setup(self):
    os.system("wget https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64 -O cloud_sql_proxy")
    os.system("chmod +x cloud_sql_proxy")
    os.system(f"./cloud_sql_proxy -instances={self.sql_args['cloud_sql_connection_name']}=tcp:3306 &")
Farnesol answered 1/5, 2020 at 12:56 Comment(4)
The job throws error RuntimeError: mysql.connector.errors.InterfaceError: 2003: Can't connect to MySQL server on 'localhost:3306' "even though it could access the table."Lime
For private ip dataflow, I think people may need to add the proxy file in cloud storage.Blanco
@Blanco Cloud NAT would allow for above solution with private ip dataflow but if Cloud NAT is not an option then I agree proxy file in Cloud Storage is a reasonable workaroundBonis
It helped me a lot. But in the last line, I added: "-dir=/cloudsql". Thanks!Laurustinus
M
3

The easy and right thing to do in 2022 is to use the cloud sql connector which will work with postgres, sqlserver, and mysql running on gcloud sql.

https://cloud.google.com/sql/docs/mysql/connect-connectors#python_1

https://pypi.org/project/cloud-sql-python-connector/

No need to white list IPs, manually load certificates, or leave your database wide open.

You use this format for the host: "project:region:instance" and connect as usual.

Monophysite answered 12/2, 2022 at 18:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.