Apache Beam Cloud Dataflow Streaming Stuck Side Input
Asked Answered
R

1

9

I'm currently building PoC Apache Beam pipeline in GCP Dataflow. In this case, I want to create streaming pipeline with main input from PubSub and side input from BigQuery and store processed data back to BigQuery.

Side pipeline code

side_pipeline = (
    p
    | "periodic" >> PeriodicImpulse(fire_interval=3600, apply_windowing=True)
    | "map to read request" >>
        beam.Map(lambda x:beam.io.gcp.bigquery.ReadFromBigQueryRequest(table=side_table))
    | beam.io.ReadAllFromBigQuery()
)

Function with side input code

def enrich_payload(payload, equipments):
    id = payload["id"]
    for equipment in equipments:
        if id == equipment["id"]:
            payload["type"] = equipment["type"]
            payload["brand"] = equipment["brand"]
            payload["year"] = equipment["year"]

            break

    return payload

Main pipeline code

main_pipeline = (
    p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/topiq")
    | "bytes to dict" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
    | "transform" >> beam.Map(transform_function)
    | "timestamping" >> beam.Map(lambda src: window.TimestampedValue(
        src,
        dt.datetime.fromisoformat(src["timestamp"]).timestamp()
    ))
    | "windowing" >> beam.WindowInto(window.FixedWindows(30))
)

final_pipeline = (
    main_pipeline
    | "enrich data" >> beam.Map(enrich_payload, equipments=beam.pvalue.AsIter(side_pipeline))
    | "store" >> beam.io.WriteToBigQuery(bq_table)
)

result = p.run()
result.wait_until_finish()

After deploy it to Dataflow, everything looks fine and no error. But then I noticed that enrich data step has two nodes instead of one.

Dataflow Graph

And also, the side input stuck as you can see it has Elements Added with 21 counts in Input Collections and - value in Elements Added in Output Collections. Enrich data stuck

You can find the full pipeline code here

I already follow all instruction in these documentations:

Yet still found this error. Please help me. Thanks!

Rescript answered 3/1, 2022 at 4:48 Comment(4)
can you please provide the full python pipeline code ? so I can reproduce it easily.Heraldry
@Heraldry Hi, I already added link to my full code, Thank you for your help!Rescript
I expend a time on it but it still not writing on the last stage, seams something related to the windowing. But, I could get some errors in other parts. 1. fix this step as: "timestamp" >> beam.Map(lambda src: window.TimestampedValue( # src,src["timestamp"])), 2. use a subscription when reading from a topic: "read" >> beam.io.ReadFromPubSub(subscription=INPUT_SUBSCRIPTION). Fix those and check if it may help you to advance.Heraldry
@Heraldry thank you so much bro! I'll check it.Rescript
A
7

Here you have a working example:

mytopic = ""
sql = "SELECT station_id, CURRENT_TIMESTAMP() timestamp FROM `bigquery-public-data.austin_bikeshare.bikeshare_stations` LIMIT 10"

def to_bqrequest(e, sql):
    from apache_beam.io import ReadFromBigQueryRequest
    yield ReadFromBigQueryRequest(query=sql)
     

def merge(e, side):
    for i in side:
        yield f"Main {e.decode('utf-8')} Side {i}"

pubsub = p | "Read PubSub topic" >> ReadFromPubSub(topic=mytopic)

side_pcol = (p | PeriodicImpulse(fire_interval=300, apply_windowing=False)
               | "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
                                           trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
                                           accumulation_mode=trigger.AccumulationMode.DISCARDING)
               | "To BQ Request" >> ParDo(to_bqrequest, sql=sql)
               | ReadAllFromBigQuery()
            )

final = (pubsub | "Merge" >> ParDo(merge, side=beam.pvalue.AsList(side_pcol))
                | Map(logging.info)
        )                    
    
p.run()

Note this uses a GlobalWindow (so that both inputs have the same window). I used a processing time trigger so that the pane contains multiple rows. 5 was chosen arbitrarily, using 1 would work too.

Please note matching the data between side and main inputs is non deterministic, and you may see fluctuating values from older fired panes.

In theory, using FixedWindows should fix this, but I cannot get the FixedWindows to work.

Artful answered 12/1, 2022 at 11:17 Comment(4)
Thank you so much bro! I'll try it firstRescript
Holee sheet dude, it's working!! thank you so much! But unfortunately, the bounty ended just an hour after I tried your solution. Do you still really want the bounty? I will give it if you wantRescript
Happy to see it worked! oh, what I pity, I saw the question while I was on holidays and I was waiting to come back to start coding :D no need to create a new bounty, I'm just happy this worked for youSerieswound
I think the reason that FixedWindows is not working is that it's trigger aggregation by PCollection watermark. When the pipeline start it only receive one element until the next PeriodicImpulse send another PColletion (after an hour in my code), hence it doesn't trigger the default trigger, trigger.AfterWatermark, since there is no element watermark passed that window. That's why trigger.AfterProcessingTime works because it doesn't depend on element watermark.Rescript

© 2022 - 2024 — McMap. All rights reserved.