Dataflow GCS to BigQuery - How to output multiple rows per input?
Asked Answered
O

1

1

Currently I am using the gcs-text-to-bigquery google provided template and feeding in a transform function to transform my jsonl file. The jsonl is pretty nested and i wanted to be able to output multiple rows per one row of the newline delimited json by doing some transforms.

For example:

{'state': 'FL', 'metropolitan_counties':[{'name': 'miami dade', 'population':100000}, {'name': 'county2', 'population':100000}…], 'rural_counties':{'name': 'county1', 'population':100000}, {'name': 'county2', 'population':100000}….{}], 'total_state_pop':10000000,….}

There will obviously be more counties than 2 and each state will have one of these lines. The output my boss wants is:

Output

When i do the gcs-to-bq text transform, i end up only getting one line per state (so I'll get miami dade county from FL, and then whatever the first county is in my transform for the next state). I read a little bit and i think this is because of the mapping in the template that expects one output per jsonline. It seems I can do a pardo(DoFn ?) not sure what that is, or there is a similar option with beam.Map in python. There is some business logic in the transforms (right now it's about 25 lines of code as the json has more columns than i showed but those are pretty simple).

Any suggestions on this? data is coming in tonight/tomorrow, and there will be hundreds of thousands of rows in a BQ table.

the template i am using is currently in java, but i can translate it to python pretty easily as there are a lot of examples online in python. i know python better and i think its easier given the different types (sometimes a field can be null) and it seems less daunting given the examples i saw look simpler, however, open to either

Optional answered 6/11, 2018 at 16:12 Comment(0)
D
2

Solving that in Python is somewhat straightforward. Here's one possibility (not fully tested):

from __future__ import absolute_import                                                               

import ast                                                                      

import apache_beam as beam                                                      
from apache_beam.io import ReadFromText                                            
from apache_beam.io import WriteToText                                             

from apache_beam.options.pipeline_options import PipelineOptions                   

import os                                                                       
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/path/to/service_account.json'      

pipeline_args = [                                                                  
    '--job_name=test'                                                              
]                                                                                  

pipeline_options = PipelineOptions(pipeline_args)                                  


def jsonify(element):                                                              
    return ast.literal_eval(element)                                               


def unnest(element):                                                            
    state = element.get('state')                                                
    state_pop = element.get('total_state_pop')                                  
    if state is None or state_pop is None:                                                   
        return                                                                  
    for type_ in ['metropolitan_counties', 'rural_counties']:                   
        for e in element.get(type_, []):                                        
            name = e.get('name')                                                
            pop = e.get('population')                                           
            county_type = (                                                     
                'Metropolitan' if type_ == 'metropolitan_counties' else 'Rural' 
            )                                                                   
            if name is None or pop is None:                                     
                continue                                                        
            yield {                                                             
                'State': state,                                                 
                'County_Type': county_type,                                     
                'County_Name': name,                                            
                'County_Pop': pop,                                              
                'State_Pop': state_pop                                          
            }

with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        

    schema = 'State:STRING,County_Type:STRING,County_Name:STRING,County_Pop:INTEGER,State_Pop:INTEGER'                                                                      

    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                                       
    )

This will only succeed if you are working with batch data. If you have streaming data then just change beam.io.Write(beam.io.BigquerySink(...)) to beam.io.WriteToBigQuery.

Doubletongued answered 6/11, 2018 at 23:18 Comment(8)
Thanks so much- how would I run this on gcs (not locally)? I changed the with beam... to make it a function and then have a python if name == main to call the run function as well as added arguments in. It doesn’t technically need any inputs (for now), but can’t figure out how to run it.Optional
you mean to run it retrieving the file from GCS or using dataflow as the runner?Doubletongued
Yeah, i changed the template a little based on this thread #47822442Optional
But I’m still not sure how to run it (just once for now as a test)Optional
if you want to test it locally you can just invoke it with python python file_name.py. Running in dataflow requires passing the argument --runner=Dataflow to the pipeline options.Doubletongued
if i pass in a file path to a whole folder like 'gs://url to file/*.json or .csv whichever the files are, is there a way to get the current filename as I need the filenames to be a column in BQOptional
not sure how you plan to do so. I recommend asking another question with what you want to do giving specific details so it'll be easier to help you.Doubletongued
just posted it as another question if you have any thoughts #53329098Optional

© 2022 - 2024 — McMap. All rights reserved.