How to read bigQuery from PCollection in Dataflow
Asked Answered
D

1

2

I Have a PCollection of Object that I get from pubsub, let say :

 PCollection<Student> pStudent ;

and in student attributes, there is an attribute let say studentID; and I want to read attributes (class_code) from BigQuery with this student id and set the class_code that I get from BQ to student Object in PCollcetion

is there anyone know how to implement this? I know that in beam there is a BigQueryIO but how can I do that, if the query string criteria that I want to execute in BQ is from student object (studentID) in PCollection and How can I set the value to PCollection from the result of BigQuery?

Dagon answered 8/11, 2018 at 6:22 Comment(1)
You don't want to use BigQuery for this use case because BigQuery is not an OLTP database and this use case seems to need transactional properties from the database. Of course, you can fetch the BigQuery table as a mapping table to your dataflow but the data will either stay immutable or you'll end up querying BigQuery so many times that might end up costing you unnecessarily. If you want to use something from GCP for this purpose, use Cloud SQL or Cloud Datastore.Oconnell
J
5

I considered two options to do this. One would be using BigQueryIO to read the whole table and materialize it as a side input or use CoGroupByKey to join all the data. Another possibility, the one I implemented herein, is to use the Java Client Library directly.

I created some dummy data with:

$ bq mk test.students name:STRING,grade:STRING
$ bq query --use_legacy_sql=false 'insert into test.students (name, grade) values ("Yoda", "A+"), ("Leia", "B+"), ("Luke", "C-"), ("Chewbacca", "F")'

which looks like this:

enter image description here

and then, within the pipeline, I generate some input dummy data:

Create.of("Luke", "Leia", "Yoda", "Chewbacca")

For each one of these "students" I fetch the corresponding grade in the BigQuery table following the approach in this example. Take into account, depending on your data volume, rate (quotas) and cost considerations as per the previous comment. Full example:

public class DynamicQueries {

    private static final Logger LOG = LoggerFactory.getLogger(DynamicQueries.class);

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
        Pipeline p = Pipeline.create(options);

        // create input dummy data     
        PCollection<String> students = p.apply("Read students data", Create.of("Luke", "Leia", "Yoda", "Chewbacca").withCoder(StringUtf8Coder.of()));

        // ParDo to map each student with the grade in BigQuery
        PCollection<KV<String, String>> marks = students.apply("Read marks from BigQuery", ParDo.of(new DoFn<String, KV<String, String>>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

                QueryJobConfiguration queryConfig =
                    QueryJobConfiguration.newBuilder(
                      "SELECT name, grade "
                          + "FROM `PROJECT_ID.test.students` "
                          + "WHERE name = "
                          + "\"" + c.element() + "\" "  // fetch the appropriate student
                          + "LIMIT 1")
                        .setUseLegacySql(false) // Use standard SQL syntax for queries.
                        .build();

                // Create a job ID so that we can safely retry.
                JobId jobId = JobId.of(UUID.randomUUID().toString());
                Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

                // Wait for the query to complete.
                queryJob = queryJob.waitFor();

                // Check for errors
                if (queryJob == null) {
                  throw new RuntimeException("Job no longer exists");
                } else if (queryJob.getStatus().getError() != null) {
                  throw new RuntimeException(queryJob.getStatus().getError().toString());
                }

                // Get the results.
                QueryResponse response = bigquery.getQueryResults(jobId)
                TableResult result = queryJob.getQueryResults();

                String mark = new String();

                for (FieldValueList row : result.iterateAll()) {
                    mark = row.get("grade").getStringValue();
                }

                c.output(KV.of(c.element(), mark));
            }
        }));

        // log to check everything is right
        marks.apply("Log results", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                LOG.info("Element: " + c.element().getKey() + " " + c.element().getValue());
                c.output(c.element());
            }
        }));

        p.run();
    }
}

And the output is:

Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Yoda A+
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Luke C-
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Chewbacca F
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Leia B+

(Tested with BigQuery 1.22.0 and 2.5.0 Java SDK for Dataflow)

Joust answered 8/11, 2018 at 14:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.