I'm currently building PoC Apache Beam pipeline in GCP Dataflow. In this case, I want to create streaming pipeline with main input from PubSub and side input from BigQuery and store processed data back to BigQuery.
Side pipeline code
side_pipeline = (
p
| "periodic" >> PeriodicImpulse(fire_interval=3600, apply_windowing=True)
| "map to read request" >>
beam.Map(lambda x:beam.io.gcp.bigquery.ReadFromBigQueryRequest(table=side_table))
| beam.io.ReadAllFromBigQuery()
)
Function with side input code
def enrich_payload(payload, equipments):
id = payload["id"]
for equipment in equipments:
if id == equipment["id"]:
payload["type"] = equipment["type"]
payload["brand"] = equipment["brand"]
payload["year"] = equipment["year"]
break
return payload
Main pipeline code
main_pipeline = (
p
| "read" >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/topiq")
| "bytes to dict" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
| "transform" >> beam.Map(transform_function)
| "timestamping" >> beam.Map(lambda src: window.TimestampedValue(
src,
dt.datetime.fromisoformat(src["timestamp"]).timestamp()
))
| "windowing" >> beam.WindowInto(window.FixedWindows(30))
)
final_pipeline = (
main_pipeline
| "enrich data" >> beam.Map(enrich_payload, equipments=beam.pvalue.AsIter(side_pipeline))
| "store" >> beam.io.WriteToBigQuery(bq_table)
)
result = p.run()
result.wait_until_finish()
After deploy it to Dataflow, everything looks fine and no error. But then I noticed that enrich data
step has two nodes instead of one.
And also, the side input stuck as you can see it has Elements Added
with 21 counts in Input Collections and -
value in Elements Added
in Output Collections.
You can find the full pipeline code here
I already follow all instruction in these documentations:
- https://beam.apache.org/documentation/patterns/side-inputs/
- https://beam.apache.org/releases/pydoc/2.35.0/apache_beam.io.gcp.bigquery.html
Yet still found this error. Please help me. Thanks!
"timestamp" >> beam.Map(lambda src: window.TimestampedValue( # src,src["timestamp"]))
, 2. use a subscription when reading from a topic:"read" >> beam.io.ReadFromPubSub(subscription=INPUT_SUBSCRIPTION)
. Fix those and check if it may help you to advance. – Heraldry