GCP Dataflow Apache Beam code logic not working as expected
Asked Answered
M

1

0

I am trying to implement a CDC in Apache Beam, deployed in Google Cloud Dataflow.

I have unloaded the master data and the new data, which is expected to coming daily. The join is not working as expected. Something is missing.

master_data = (
    p | 'Read base from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=master_data, use_standard_sql=True))
      | 'Map id in master' >> beam.Map(lambda master: (
          master['id'], master)))
new_data = (
    p | 'Read Delta from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=new_data, use_standard_sql=True))
      | 'Map id in new' >> beam.Map(lambda new: (new['id'], new)))

joined_dicts = (
    {'master_data' :master_data, 'new_data' : new_data }
    | beam.CoGroupByKey()
    | beam.FlatMap(join_lists)
    | 'mergeddicts' >> beam.Map(lambda masterdict, newdict: newdict.update(masterdict))
) 

def join_lists(k,v):
    itertools.product(v['master_data'], v['new_data'])

Observations (on sample data):

Data from the master

1, 'A',3232

2, 'B',234

New Data:

1,'A' ,44

4,'D',45

Expected result in master table, post the code implementation:

1, 'A',44

2, 'B',234

4,'D',45

However, what I am getting in master table is:

1,'A' ,44

4,'D',45

Am I missing a step? Can anyone please assist in rectifying my mistake.

Mytilene answered 20/12, 2019 at 14:26 Comment(0)
T
1

You don't need to flatten after group by as it separates the elements again.

Here is the sample code.

from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam  

def join_lists(e):
    (k,v)=e
    return (k, v['new_data']) if v['new_data'] != v['master_data'] else (k, None)

with beam.Pipeline(options=PipelineOptions()) as p:
    master_data = (
        p | 'Read base from BigQuery ' >> beam.Create([('A', [3232]),('B', [234])])
    )
    new_data = (
        p | 'Read Delta from BigQuery ' >> beam.Create([('A',[44]),('D',[45])])
    )

    joined_dicts = (
        {'master_data' :master_data, 'new_data' : new_data }
        | beam.CoGroupByKey()
        | 'mergeddicts' >> beam.Map(join_lists)
    )
    result = p.run()
    result.wait_until_finish()
print("Pipeline finished.")
Taranto answered 2/1, 2020 at 22:33 Comment(4)
hello Ankur, how do I return (k, v['master_data'] also along with your logic when v['new_data'] is NoneMytilene
can you elaborate and post this as a new question.Taranto
Sure. I tried your suggested approach, however I got 1,'A' ,44 4,'D',45 but I am still missing this 2, 'B',234, along with the 1, and 4 tuple . so my commentMytilene
I think thats what CD does, 'A' ,44 4,'D',45 are the only changed values in new data. Old values are not emitter as they are not changes but original values. If you want original values they you can use return (k, v['new_data']) if v['new_data'] else (k, v['master_data'])Taranto

© 2022 - 2024 — McMap. All rights reserved.