Python - How to - Big Query asynchronous tasks
Asked Answered
F

5

12

This may be a dummy question but I cannot seem to be able to run python google-clood-bigquery asynchronously.

My goal is to run multiple queries concurrently and wait for all to finish in an asyncio.wait() query gatherer. I'm using asyncio.create_tast() to launch the queries. The problem is that each query waits for the precedent one to complete before starting.

Here is my query function (quite simple):

async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
  job = self.api.query(query, **kwargs)
  return job.result()

Since I cannot await job.result() should I await something else?

Fundus answered 30/10, 2018 at 13:25 Comment(2)
Is it mandatory to use asyncio? Bigquery's python api has no support for async yeild so better option probably would be to execute the queries on background at some ThreadPool executor.Moreira
I'm just starting with python, but would it be possible to wrap the query as an async call with your method?Fundus
M
24

If you are working inside of a coroutine and want to run different queries without blocking the event_loop then you can use the run_in_executor function which basically runs your queries in background threads without blocking the loop. Here's a good example of how to use that.

Make sure though that that's exactly what you need; jobs created to run queries in the Python API are already asynchronous and they only block when you call job.result(). This means that you don't need to use asyncio unless you are inside of a coroutine.

Here's a quick possible example of retrieving results as soon as the jobs are finished:

from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq


client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'

threads = []
results = []

executor = ThreadPoolExecutor(5)

for job in [client.query(query1), client.query(query2)]:
    threads.append(executor.submit(job.result))

# Here you can run any code you like. The interpreter is free

for future in as_completed(threads):
    results.append(list(future.result()))

results will be:

[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]
Moreira answered 30/10, 2018 at 21:42 Comment(0)
T
6

just to share a different solution:

import numpy as np
from time import sleep


query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'


def dummy_callback(future):
    global jobs_done
    jobs_done[future.job_id] = True


jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]

# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

Rather than using as_completed I prefer to use the built-in async functionality from the bigquery jobs themselves. This also makes it possible for me to decompose the datapipeline into separate Cloud Functions, without having to keep the main ThreadPoolExecutor live for the duration of the whole pipeline. Incidentally, this was the reason why I was looking into this: my pipelines are longer than the max timeout of 9 minutes for Cloud Functions (or even 15 minutes for Cloud Run).

Downside is I need to keep track of all the job_ids across the various functions, but that is relatively easy to solve when configuring the pipeline by specifying inputs and outputs such that they form a directed acyclic graph.

Tamtam answered 24/7, 2019 at 20:11 Comment(2)
I'm trying to copy this with about 200 queries each of which takes about 10 seconds so I have for q in queries: job = bq.query(query[q]) jobs_done[job.job_id] = False job.add_callback(dummycallback) print("Now wait") I am not seeing any significant speedup. Seems like the slow part is launching the query itself.Blasted
That could well be the case, note that BQ startup times are of order several seconds. So if you have ma up small queries, you won't see much improvement.Tamtam
M
2

I used @dkapitan 's answer to provide an async wrapper:

    async def async_bigquery(client, query):
        done = False
        def callback(future):
            nonlocal done
            done = True
        job = client.query(query)
        job.add_done_callback(callback)
        while not done:
            await asyncio.sleep(.1)
        return job
Medlar answered 13/2, 2023 at 10:42 Comment(0)
F
0

In fact I found a way to wrap my query in an asyinc call quite easily thanks to the asyncio.create_task() function. I just needed to wrap the job.result() in a coroutine; here is the implementation. It does run asynchronously now.

class BQApi(object):                                                                                                 
    def __init__(self):                                                                                              
        self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])                               

    async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:                                       
        job = self.api.query(query, **kwargs)                                                                        
        task = asyncio.create_task(self.coroutine_job(job))                                                          
        return await task                                                                                            

    @staticmethod                                                                                                    
    async def coroutine_job(job):                                                                                    
        return job.result()                                                                                          
Fundus answered 31/10, 2018 at 12:47 Comment(3)
Am struggling with the same question @Antoine Dussarps. Have posted an issue on google-cloud-python, referencing this thread. github.com/googleapis/google-cloud-python/issues/8726Tamtam
I don't think that's doing what you want. create_task schedules some async work and lets you await it. This is so you can start several tasks in parallel but it does not convert blocking calls into async calls. When coroutine_job is called it will still block the event loop. What you want here is to run this in a thread via to_thread.Catina
This is weird. This snippet does nothing but wraps a sync function into async function. The event loop will be blocked during the execution of the query.Claribel
G
0

As Willian Fuks said you have to run it in a separate thread. And there is a library from FastAPI founders to make it simple: asyncer

Usage example:

result = await asyncify(future.result)()

It uses AnyIO thread pool, so by default, you'll have up to 40 threads in it.

Geny answered 12/4 at 11:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.