Google-cloud-dataflow: Why pipeline run twice with DirectRunner?
Asked Answered
A

1

5

Given the data set as below

{"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544}
{"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545}

I try to filter those json data through type:ba and insert them into bigquery with python sdk

ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'

class ParseJsonDoFn(beam.DoFn):
    B_TYPE = 'tag_B'
    def process(self, element):
        text_line = element.trip()
        data = json.loads(text_line)

        if data['type'] == 'ba':
            ba = {'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']}
            yield pvalue.TaggedOutput(self.B_TYPE, ba)

def run():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                      dest='input',
                      default='data/path/data',
                      help='Input file to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
      '--runner=DirectRunner',
      '--project=project-id',
      '--job_name=data-job',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromText(known_args.input)

        multiple_lines = (
            lines
            | 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
                                      ParseJsonDoFn.B_TYPE)))

        ba_line = multiple_lines.tag_B

       (ba_line
        | "output_ba" >> beam.io.Write(
                      beam.io.BigQuerySink(
                          table = 'ba',
                          dataset = 'temp',
                          project = 'project-id',
                          schema = ba_schema,
                          # write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                        )
                    ))

         p.run().wait_until_finish()

The output is

/usr/local/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:342: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will not be supported
  pipeline.replace_all(_get_transform_overrides(pipeline.options))
INFO:root:Running pipeline with DirectRunner.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Writing 2 rows to project-id:temp.ba table.
INFO:root:Running pipeline with DirectRunner.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Writing 2 rows to project-id:temp.ba table.

We notice there are two lines of INFO:root:Writing 2 rows to project-id:temp.ba table., and query this table in bigquery

select * from `temp.ba`;

There are 4 records of duplicate data in this table.

I want to know why the pipeline run the same job twice?

Ascariasis answered 11/9, 2018 at 7:21 Comment(1)
How are you triggering/calling your Python script?Bedard
O
14

The with statement for Pipeline runs the pipeline. Specifically:

with beam.Pipeline(...) as p:
  [...code...]

is equivalent to:

p = beam.Pipeline(...)
[...code...]
p.run().wait_until_finish()

See the implementation.

Octarchy answered 11/9, 2018 at 18:20 Comment(2)
Thanks so much, I was stuck with the same problem :) it saved me bigtimePneumothorax
hehe that's not very intuitive, thank you for the answer, saved me tooAssibilate

© 2022 - 2024 — McMap. All rights reserved.