I am trying to use Apache Beam with Python to fetch JSON data from an API and write it to a BigQuery table. Here is the code I am using:
import argparse
import json
import requests
import apache_beam as beam
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--project', dest='project', required=True, help='GCP project')
parser.add_argument('--region', dest='region', required=True, help='GCP region')
parser.add_argument('--output', dest='output', required=True, help='Output BigQuery table')
known_args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=options)
schema = 'postId:INTEGER, id:INTEGER, name:STRING, email:STRING, body:STRING'
# Fetch comments from the API
(p | 'Fetch comments' >> beam.Create([requests.get('https://jsonplaceholder.typicode.com/comments').text])
| 'Load JSON' >> beam.Map(json.loads)
| 'Flatten' >> beam.FlatMap(lambda x: x)
| 'Map to BQ row' >> beam.Map(lambda x: {
'postId': x['postId'],
'id': x['id'],
'name': x['name'],
'email': x['email'],
'body': x['body']
})
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.output,
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
However, I am encountering the following error:
Traceback (most recent call last):
File "fetch_comments_beam.py", line 43, in <module>
run()
File "fetch_comments_beam.py", line 31, in run
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/gcp/bigquery.py", line 1934, in __init__
self.table_reference = bigquery_tools.parse_table_reference(
File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/gcp/bigquery_tools.py", line 244, in parse_table_reference
if isinstance(table, TableReference):
TypeError: isinstance() arg 2 must be a type or tuple of types
I've tried different approaches, but the error persists. How can I resolve this issue?
I run the code with the following arguments:
python3 fetch_comments_beam.py --project onyx-osprey-251417 --region us-central1 --output onyx-osprey-251417:comments_dataset.comments
CLOSED: I just had to install apache-beam[gcp] instead of plain apache-beam.
pip install apache-beam[gcp]