How can I write to Big Query using a runtime value provider in Apache Beam?
Asked Answered
B

1

6

EDIT: I got this to work using beam.io.WriteToBigQuery with the sink experimental option turned on. I actually had it on but my issue was I was trying to "build" the full table reference from two variables (dataset + table) wrapped in str(). This was taking the whole value provider arguments data as a string instead of calling the get() method to get just the value.

OP

I am trying to generate a Dataflow template to then call from a GCP Cloud Function.(For reference, my dataflow job is supposed to read a file with a bunch of filenames in it and then reads all of those from GCS and writes the to BQ). Because of this I need to write it in such a way so that I can use runtime value providers to pass the BigQuery dataset/table.

At the bottom of my post is my code currently, omitting some stuff that's not relevant to the question from it. Pay attention to the BQ_flexible_writer(beam.DoFn) specifically - that's where I am trying to "customise" beam.io.WriteToBigQuery so that it accepts the runtime value providers.

My template generates fine and when I test run the pipeline without supplying runtime variables (relying on the defaults) it succeeds and I see the rows added when looking at the job in the console. However, when checking BigQuery there's no data (tripple checked the dataset/table name is correct in the logs). Not sure where it goes or what logging I can add to understand what's happening to the elements?

Any ideas what's happening here? Or suggestions on how I can write to BigQuery using runtime variables? Can I even call beam.io.WriteToBigQuery the way I've included it in my DoFn or do I have to take the actual code behind beam.io.WriteToBigQuery and work with that?My "successful" job DAG, showing elements are being operated on by my custom BQ writer

#=========================================================

class BQ_flexible_writer(beam.DoFn):
    def __init__(self, dataset, table):
        self.dataset = dataset
        self.table = table

    def process(self, element):
        dataset_res = self.dataset.get()
        table_res = self.table.get()
        logging.info('Writing to table: {}.{}'.format(dataset_res,table_res))
        beam.io.WriteToBigQuery(
        #dataset= runtime_options.dataset,
        table = str(dataset_res) + '.' + str(table_res), 
        schema = SCHEMA_ADFImpression,
        project = str(PROJECT_ID), #options.display_data()['project'],
        create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  #'CREATE_IF_NEEDED',#create if does not exist.
        write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
        )
# https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#valueprovider
class FileIterator(beam.DoFn):
    def __init__(self, files_bucket):
        self.files_bucket = files_bucket

    def process(self, element):
        files = pd.read_csv(str(element), header=None).values[0].tolist()
        bucket = self.files_bucket.get()
        files = [str(bucket) + '/' + file for file in files]
        logging.info('Files list is: {}'.format(files))
        return files

# https://mcmap.net/q/1914928/-ways-of-using-value-provider-parameter-in-python-apache-beam   
class OutputValueProviderFn(beam.DoFn):
    def __init__(self, vp):
        self.vp = vp

    def process(self, unused_elm):
        yield self.vp.get()


class RuntimeOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):

        parser.add_value_provider_argument(
          '--dataset',
          default='EDITED FOR PRIVACY',
          help='BQ dataset to write to',
          type=str)

        parser.add_value_provider_argument(
          '--table',
          default='EDITED FOR PRIVACY',
          required=False,
          help='BQ table to write to',
          type=str)

        parser.add_value_provider_argument(
          '--filename',
          default='EDITED FOR PRIVACY',
          help='Filename of batch file',
          type=str)

        parser.add_value_provider_argument(
          '--batch_bucket',
          default='EDITED FOR PRIVACY',
          help='Bucket for batch file',
          type=str)

        #parser.add_value_provider_argument(
        #   '--bq_schema',
          #default='gs://dataflow-samples/shakespeare/kinglear.txt',
        #  help='Schema to specify for BQ')

        #parser.add_value_provider_argument(
        #   '--schema_list',
          #default='gs://dataflow-samples/shakespeare/kinglear.txt',
        #  help='Schema in list for processing')

        parser.add_value_provider_argument(
          '--files_bucket',
          default='EDITED FOR PRIVACY',
          help='Bucket where the raw files are',
          type=str)

        parser.add_value_provider_argument(
          '--complete_batch',
          default='EDITED FOR PRIVACY',
          help='Bucket where the raw files are',
          type=str)
#=========================================================

def run():
    #====================================
    # TODO PUT AS PARAMETERS 
    #====================================
    JOB_NAME_READING = 'adf-reading'
    JOB_NAME_PROCESSING = 'adf-'

    job_name = '{}-batch--{}'.format(JOB_NAME_PROCESSING,_millis())

    pipeline_options_batch = PipelineOptions()

    runtime_options = pipeline_options_batch.view_as(RuntimeOptions)

    setup_options = pipeline_options_batch.view_as(SetupOptions)
    setup_options.setup_file  = './setup.py'
    google_cloud_options = pipeline_options_batch.view_as(GoogleCloudOptions)
    google_cloud_options.project = PROJECT_ID
    google_cloud_options.job_name = job_name
    google_cloud_options.region = 'europe-west1'
    google_cloud_options.staging_location = GCS_STAGING_LOCATION
    google_cloud_options.temp_location = GCS_TMP_LOCATION


    #pipeline_options_batch.view_as(StandardOptions).runner = 'DirectRunner'

    # # If datflow runner [BEGIN]
    pipeline_options_batch.view_as(StandardOptions).runner = 'DataflowRunner'
    pipeline_options_batch.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'

    #pipeline_options_batch.view_as(WorkerOptions).machine_type = 'n1-standard-96' #'n1-highmem-32' #' 
    pipeline_options_batch.view_as(WorkerOptions).max_num_workers = 10
    #  [END]

    pipeline_options_batch.view_as(SetupOptions).save_main_session = True
    #Needed this in order to pass table to BQ at runtime
    pipeline_options_batch.view_as(DebugOptions).experiments = ['use_beam_bq_sink']


    with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:

        try:

            final_data = (
            pipeline_2
            |'Create empty PCollection' >> beam.Create([None])
            |'Get accepted batch file 1/2:{}'.format(OutputValueProviderFn(runtime_options.complete_batch)) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
            |'Get accepted batch file 2/2:{}'.format(OutputValueProviderFn(runtime_options.complete_batch)) >> beam.ParDo(FileIterator(runtime_options.files_bucket))
            |'Read all files' >> beam.io.ReadAllFromText(skip_header_lines=1)
            |'Process all files' >> beam.ParDo(ProcessCSV(),COLUMNS_SCHEMA_0)
            |'Format all files' >> beam.ParDo(AdfDict())
            #|'WriteToBigQuery_{}'.format('test'+str(_millis())) >> beam.io.WriteToBigQuery(
            #        #dataset= runtime_options.dataset,
            #        table = str(runtime_options.dataset) + '.' + str(runtime_options.table), 
            #        schema = SCHEMA_ADFImpression,
            #        project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'],
            #        create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  #'CREATE_IF_NEEDED',#create if does not exist.
            #        write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
            #        )
            |'WriteToBigQuery' >> beam.ParDo(BQ_flexible_writer(runtime_options.dataset,runtime_options.table))
            )
        except Exception as exception:
            logging.error(exception)
            pass
Broadbent answered 6/12, 2019 at 17:37 Comment(3)
You have a success in your Beam pipeline and no data in your dataset/table? How do you perform the check in bigquery? go to the preview tab? Do you know the dataset/table at the beginning of the workflow? Or only at the middle of the process?Domestic
I am facing the same issue while trying to use WriteToBigQuery from a ParDo function. Please let me know if you have found the solution to this. The WriteToBigQuery would expect a Pcollection rather than a row wise approach I guess, and that is the reason for it not being able to execute from a ParDo fn.Haul
@BhaskarBhuyan Yeah Chamikara below confirmed it does not work. Can you not just add the BQ I/O adapter after your ParDo?Broadbent
H
3

Please run this with following additional option.

--experiment=use_beam_bq_sink

Without this, Dataflow currently overrides BigQuery sink with a native version which does not support ValueProviders.

Additionally, note that setting the dataset as a runtime parameter is not supported. Try specifying the table parameter as an entire table reference (DATASET.TABLE or PROJECT:DATASET.TABLE) instead.

Huan answered 6/12, 2019 at 23:22 Comment(8)
Thanks. I already have this, just before the beginning of my pipeline. I see it listed in my experiments variable in the console dataflow job. Can you confirm if my code for it looks OK?Broadbent
Seems like you are trying to invoke a transform from a DoFn.process() method directly. That will not work. Beam WriteToBigQuery transform (as long as you use the above experiment) will allow specifying the full table reference (including dataset) in parameter 'table ' as a runtime value provider. So you should be able to directly use that.Huan
That's how it was initially, check out the commented out part of the pipeline. I was getting the error about trying to access a runtime variable from the wrong context. I'll try again tomorrow and post you the actual error and a screen of the job showing the experimental option is on.Broadbent
Could be because you tried to set dataset as a runtime parameter. Note that this is not supported. Try specifying the table parameter as an entire table reference (DATASET.TABLE or PROJECT:DATASET.TABLE) instead.Huan
I had done this (building the full table reference from two value providers) but thought I'd try supplying a single one instead. When trying to generate the template I get: ERROR:root:Expected a table reference (PROJECT:DATASET.TABLE or DATASET.TABLE) instead of RuntimeValueProvider(option: comp_table, type: str, default_value: 'Test.Test'). Any other ideas? It seems that despite using the experimental feature WriteToBigQuery just isn't accepting runtime value providers?Broadbent
@GeorgeS I see that you have edited your question afterwards and it appears to be resolved. Can you please confirm that the answer given suits you well? Is there anything missing regarding your initial problem?Mcconnell
@JuanFernándezAfonso The answer together with the comments got me to solve my issue (i.e. trying to invoke transform from a DoFn doesn't work and beam.io.WriteToBigQuery with use_beam_bq_sink should work). Probably one bit of missing information is how the value provider arguments are handled by beam.io.WriteToBigQuery. Specifically the fact that you need to pass a single value provider which is a complete table reference (dataset.table or dataset:table) on which beam.io.WriteToBigQuery calls the get() method.Broadbent
@Huan - regarding "note that setting the dataset as a runtime parameter is not supported". Is this the case with ReadFromBigQuery as well in python ? I am getting an error while trying to read from a bigquery table with the value of table being supplied at run time. Referring to this question -> #67441816Rillings

© 2022 - 2024 — McMap. All rights reserved.