Write BigQuery results to GCS in CSV format using Apache Beam
Asked Answered
I

2

6

I am pretty new working on Apache Beam , where in I am trying to write a pipeline to extract the data from Google BigQuery and write the data to GCS in CSV format using Python.

Using beam.io.read(beam.io.BigQuerySource()) I am able to read the data from BigQuery but not sure how to write it to GCS in CSV format.

Is there a custom function to achieve the same , could you please help me?

import logging

import apache_beam as beam
from apache_beam.io.BigQueryDisposition import CREATE_IF_NEEDED
from apache_beam.io.BigQueryDisposition import WRITE_TRUNCATE

PROJECT='project_id'
BUCKET='project_bucket'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=readwritebq',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
    ]

    with beam.Pipeline(argv=argv) as p:

        # Execute the SQL in big query and store the result data set into given Destination big query table.
        BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
            beam.io.BigQuerySource(query =  'Select * from `dataset.table`', use_standard_sql=True))
        # Extract data from Bigquery to GCS in CSV format.
        # This is where I need your help

        BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
                table='tablename',
                dataset='datasetname',
                project='project_id',
                schema='name:string,gender:string,count:integer',
                create_disposition=CREATE_IF_NEEDED,
                write_disposition=WRITE_TRUNCATE)

if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   run()
Intermixture answered 22/10, 2018 at 12:27 Comment(1)
Welcome to Stack Overflow! Please take the tour and visit the help center to get the most out of this site. Please also share the relevant parts of the code you have developed so far. This helps figuring out what the problem could be.Eddra
M
15

You can do so using WriteToText to add a .csv suffix and headers. Take into account that you'll need to parse the query results to CSV format. As an example, I used the Shakespeare public dataset and the following query:

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10

We now read the query results with:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA now contains key-value pairs:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}

We can apply a beam.Map function to yield only values:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

Excerpt of BQ_VALUES:

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]

And finally map again to have all column values separated by commas instead of a list (take into account that you would need to escape double quotes if they can appear within a field):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

Now we write the results to GCS with the suffix and headers:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
    'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

Written results:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"
Mullis answered 22/10, 2018 at 16:56 Comment(2)
On the same example if I had to write the files from GCS to Local Directory using subprocess (GSUTIL command) , how would i acheive in the pipeline.Intermixture
Should be noted that this solution relies on key ordering, since x.values() is using implicit ordering. It is a reasonable assumption for python 3.7 onwards ( #1868361 ) but not sure whether BigQuerySource guarantees that for earlier versions, too.Citystate
C
1

For anyone looking for an update using Python 3, replace the line of

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

with

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: list(x.values()))
Cohobate answered 3/3, 2021 at 20:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.