How to use asynchronous feature of pyscopg2?
Asked Answered
D

3

16

I'm trying to execute 3 different postgresql queries with different table. Each query takes 2 seconds to execute. I was wondering if it's possible to run all 3 queries at the same time so that I can save 4 seconds. I tried using the asynchronous feature of pyscopg2 but it only returns the result of last query. Can anyone point out what I'm doing wrong ?

import select
import psycopg2
import psycopg2.extensions

def wait(conn):
    while 1:
        state = conn.poll()
        if state == psycopg2.extensions.POLL_OK:
            break
        elif state == psycopg2.extensions.POLL_WRITE:
            select.select([], [conn.fileno()], [])
        elif state == psycopg2.extensions.POLL_READ:
            select.select([conn.fileno()], [], [])
        else:
            raise psycopg2.OperationalError("poll() returned %s" % state)


aconn = psycopg2.connect(
  dbname=pg_name,
  user=pg_username,
  host=pg_host,
  password=pg_password,
  async=1)

wait(aconn)
acurs = aconn.cursor()

acurs.execute(
              "SELECT 1;"
              "SELECT ST_Length(ST_GeomFromText"
              "('LINESTRING(743238 2967416,743238 2967450)',4326));"
              "SELECT 3;"
             )
wait(acurs.connection)
result = acurs.fetchall()
print result

This only prints: "result": [[3]]

Detonate answered 24/7, 2016 at 2:38 Comment(0)
W
8

Per the Psycopg Introduction:

[Psycopg] is a wrapper for the libpq, the official PostgreSQL client library.

Then, looking at the libpq documentation for PQexec() (the function used to send SQL queries to the PostgreSQL database), we see the following note (emphasis mine):

Multiple queries sent in a single PQexec call are processed in a single transaction, unless there are explicit BEGIN/COMMIT commands included in the query string to divide it into multiple transactions. Note however that the returned PGresult structure describes only the result of the last command executed from the string.

So, unfortunately, what you're trying to do is simply not supported by psycopg2 and libpq. (This isn't to say that other client interfaces to PostgreSQL don't support it, though, but that's out of scope for this question.)

So to answer your question, what you're doing wrong is executing multiple SQL queries in one execute() call and trying to retrieve all of their results afterwards, when in fact it's not possible. You need to explicitly execute each query and retrieve the results individually, or else try and find another API to PostgreSQL that supports returning multiple result sets at once.


The Python Database API 2.0 specification does allow for the optional nextset() method to be implemented by the library which moves the cursor to the next result set returned from the queries executed, but this method is not implemented in psycopg2 (for obvious reasons) and in fact raises a NotSupportedError exception if you try to call it (see the docs).

Whitewing answered 29/11, 2016 at 15:43 Comment(0)
S
2

You can use the aiopg package along with Python's builtin asyncio.gather to execute queries concurrently. The aiopg package uses psycopg2 under the hood.

This example runs three queries which each take 2s, however the total execution time is ~2s rather than 6s

import asyncio
import aiopg

dsn = f'dbname={PG_DATABASE} user={PG_USER} password={PG_PASSWORD} host={PG_HOST}'

async def query(query: str):
    async with aiopg.connect(dsn) as con:
        async with con.cursor() as cursor:
            await cursor.execute(query)
            result = []
            async for row in cursor:
                result.append(row)
            return result


async def main():
    result1, result2, result3 = await asyncio.gather(
        query('SELECT pg_sleep(2)'),
        query('SELECT pg_sleep(2)'),
        query('SELECT pg_sleep(2)')
    )
    print(result1, result2, result3)

asyncio.run(main())
Sarilda answered 29/1 at 18:35 Comment(0)
S
0

It looks like it is now supported as of version 2.2

def wait(conn):
    while True:
        state = conn.poll()
        if state == psycopg2.extensions.POLL_OK:
            break
        elif state == psycopg2.extensions.POLL_WRITE:
            select.select([], [conn.fileno()], [])
        elif state == psycopg2.extensions.POLL_READ:
            select.select([conn.fileno()], [], [])
        else:
            raise psycopg2.OperationalError("poll() returned %s" % state)

Source: https://www.psycopg.org/docs/advanced.html#asynchronous-support

Skuld answered 23/7, 2021 at 18:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.