TypeError: isinstance() arg 2 must be a type or tuple of types" while using WriteToBigQuery in Apache Beam
Asked Answered
G

0

6

I am trying to use Apache Beam with Python to fetch JSON data from an API and write it to a BigQuery table. Here is the code I am using:

import argparse
import json
import requests
import apache_beam as beam
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions


def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--project', dest='project', required=True, help='GCP project')
    parser.add_argument('--region', dest='region', required=True, help='GCP region')
    parser.add_argument('--output', dest='output', required=True, help='Output BigQuery table')
    known_args, pipeline_args = parser.parse_known_args(argv)
    options = PipelineOptions(pipeline_args)
    p = beam.Pipeline(options=options)

    schema = 'postId:INTEGER, id:INTEGER, name:STRING, email:STRING, body:STRING'

    # Fetch comments from the API
    (p | 'Fetch comments' >> beam.Create([requests.get('https://jsonplaceholder.typicode.com/comments').text])
       | 'Load JSON' >> beam.Map(json.loads)
       | 'Flatten' >> beam.FlatMap(lambda x: x)
       | 'Map to BQ row' >> beam.Map(lambda x: {
            'postId': x['postId'],
            'id': x['id'],
            'name': x['name'],
            'email': x['email'],
            'body': x['body']
       })
       | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
           known_args.output,
           schema=schema,
           write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    )

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
    run()

However, I am encountering the following error:

Traceback (most recent call last):
  File "fetch_comments_beam.py", line 43, in <module>
    run()
  File "fetch_comments_beam.py", line 31, in run
    | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/gcp/bigquery.py", line 1934, in __init__
    self.table_reference = bigquery_tools.parse_table_reference(
  File "/usr/local/lib/python3.8/dist-packages/apache_beam/io/gcp/bigquery_tools.py", line 244, in parse_table_reference
    if isinstance(table, TableReference):
TypeError: isinstance() arg 2 must be a type or tuple of types

I've tried different approaches, but the error persists. How can I resolve this issue?

I run the code with the following arguments:

python3 fetch_comments_beam.py --project onyx-osprey-251417 --region us-central1 --output onyx-osprey-251417:comments_dataset.comments

CLOSED: I just had to install apache-beam[gcp] instead of plain apache-beam.

pip install apache-beam[gcp]
Gang answered 24/4, 2023 at 10:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.