I considered two options to do this. One would be using BigQueryIO
to read the whole table and materialize it as a side input or use CoGroupByKey
to join all the data. Another possibility, the one I implemented herein, is to use the Java Client Library directly.
I created some dummy data with:
$ bq mk test.students name:STRING,grade:STRING
$ bq query --use_legacy_sql=false 'insert into test.students (name, grade) values ("Yoda", "A+"), ("Leia", "B+"), ("Luke", "C-"), ("Chewbacca", "F")'
which looks like this:
and then, within the pipeline, I generate some input dummy data:
Create.of("Luke", "Leia", "Yoda", "Chewbacca")
For each one of these "students" I fetch the corresponding grade in the BigQuery table following the approach in this example. Take into account, depending on your data volume, rate (quotas) and cost considerations as per the previous comment. Full example:
public class DynamicQueries {
private static final Logger LOG = LoggerFactory.getLogger(DynamicQueries.class);
@SuppressWarnings("serial")
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// create input dummy data
PCollection<String> students = p.apply("Read students data", Create.of("Luke", "Leia", "Yoda", "Chewbacca").withCoder(StringUtf8Coder.of()));
// ParDo to map each student with the grade in BigQuery
PCollection<KV<String, String>> marks = students.apply("Read marks from BigQuery", ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"SELECT name, grade "
+ "FROM `PROJECT_ID.test.students` "
+ "WHERE name = "
+ "\"" + c.element() + "\" " // fetch the appropriate student
+ "LIMIT 1")
.setUseLegacySql(false) // Use standard SQL syntax for queries.
.build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// Get the results.
QueryResponse response = bigquery.getQueryResults(jobId)
TableResult result = queryJob.getQueryResults();
String mark = new String();
for (FieldValueList row : result.iterateAll()) {
mark = row.get("grade").getStringValue();
}
c.output(KV.of(c.element(), mark));
}
}));
// log to check everything is right
marks.apply("Log results", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("Element: " + c.element().getKey() + " " + c.element().getValue());
c.output(c.element());
}
}));
p.run();
}
}
And the output is:
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Yoda A+
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Luke C-
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Chewbacca F
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Leia B+
(Tested with BigQuery 1.22.0 and 2.5.0 Java SDK for Dataflow)