The Beam Python SDK does not have a built-in transform to read data from a MySQL/Postgres database. Nonetheless, it should not be too troublesome to write a custom transform to do this. You can do something like this:
with beam.Pipeline() as p:
query_result_pc = (p
| beam.Create(['select a,b,c from table1'])
| beam.ParDo(QueryMySqlFn(host='...', user='...'))
| beam.Reshuffle())
To connect to MySQL, we'll use the mysql-specific library mysql.connector, but you can use the appropriate library for Postgres/etc.
Your querying function is:
import mysql.connector
class QueryMySqlFn(beam.DoFn):
def __init__(self, **server_configuration):
self.config = server_configuration
def start_bundle(self):
self.mydb = mysql.connector.connect(**self.config)
self.cursor = mydb.cursor()
def process(self, query):
self.cursor.execute(query)
for result in self.cursor:
yield result
For Postgres, you would use psycopg2
or any other library that allows you to connect to it:
import psycopg2
class QueryPostgresFn(beam.DoFn):
def __init__(self, **server_config):
self.config = server_config
def process(self, query):
con = psycopg2.connect(**self.config)
cur = con.cursor()
cur.execute(query)
return cur.fetchall()
FAQ
- Why do you have a
beam.Reshuffle
transform there? - Because the QueryMySqlFn
does not parallelize reading data from the database. The reshuffle will ensure that our data is parallelized downstream for further processing.
ParDo
code which will use standard SQL connector (i.e.mysql connector
for Python) to write/read data from CloudSQL. The downside of this is that you will definitely encounter a problem with credentials to the DB, depending on your security policy (whether you allow to connect to CloudSQL from any WAN IP or not). – Leitmotiv