How to use GCP Cloud SQL as Dataflow source and/or sink with Python?
Asked Answered
V

2

5

Is there any guidance available to use Google Cloud SQL as a Dataflow read source and/or sink?

At the Apache Beam Python SDK 2.1.0 documentation there isn't a chapter mentioning Google Cloud SQL. But there is written about BigQuery.

And as I read tutorial Performing ETL from a Relational Database into BigQuery, I saw that they used exported data to file as a source in the process. That means there has to be an export step in between and that't not ideal.

Are there specific issues you need to take care of when using Cloud SQL in specific? For both source as sink?

Valley answered 2/10, 2017 at 15:9 Comment(5)
Have you tried JdbcIO?Playbill
Apologies: JdbcIO exists only in Java. For now, in Python, all I can recommend is mimicking the implementation of JdbcIO.read() using Python-specific database connectivity facilities - it is quite simple. That would be a very welcome contribution to Beam.Playbill
Dear jkff, thank you for the hint!Valley
You can easily write ParDo code which will use standard SQL connector (i.e. mysql connector for Python) to write/read data from CloudSQL. The downside of this is that you will definitely encounter a problem with credentials to the DB, depending on your security policy (whether you allow to connect to CloudSQL from any WAN IP or not).Leitmotiv
Were you able to achieve what you wanted?Java
I
4

The Beam Python SDK does not have a built-in transform to read data from a MySQL/Postgres database. Nonetheless, it should not be too troublesome to write a custom transform to do this. You can do something like this:

with beam.Pipeline() as p:
  query_result_pc = (p 
                     | beam.Create(['select a,b,c from table1'])
                     | beam.ParDo(QueryMySqlFn(host='...', user='...'))
                     | beam.Reshuffle())

To connect to MySQL, we'll use the mysql-specific library mysql.connector, but you can use the appropriate library for Postgres/etc.

Your querying function is:

import mysql.connector


class QueryMySqlFn(beam.DoFn):

  def __init__(self, **server_configuration):
    self.config = server_configuration

  def start_bundle(self):
      self.mydb = mysql.connector.connect(**self.config)
      self.cursor = mydb.cursor()

  def process(self, query):
    self.cursor.execute(query)
    for result in self.cursor:
      yield result

For Postgres, you would use psycopg2 or any other library that allows you to connect to it:

import psycopg2

class QueryPostgresFn(beam.DoFn):

  def __init__(self, **server_config):
    self.config = server_config

  def process(self, query):
    con = psycopg2.connect(**self.config)
    cur = con.cursor()

    cur.execute(query)
    return cur.fetchall()

FAQ

  • Why do you have a beam.Reshuffle transform there? - Because the QueryMySqlFn does not parallelize reading data from the database. The reshuffle will ensure that our data is parallelized downstream for further processing.
Islander answered 25/9, 2019 at 21:25 Comment(0)
D
0

there is one good library https://github.com/pysql-beam/pysql-beam for SQL ingestion, please go through the examples, it supports RDBMS like MySQL and Postgresql.

It has provided read and write the options like below we can read the data from google cloud SQL:

from pysql_beam.sql_io.sql import ReadFromSQL

....
ReadFromSQL(host=options.host, port=options.port,
        username=options.username, password=options.password,
        database=options.database,
        query=options.source_query,
        wrapper=PostgresWrapper,
        batch=100000)
Drier answered 25/1, 2023 at 9:13 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.