How can I stream more data than will fit in memory from a PostgreSQL query to a parquet file?
Asked Answered
E

3

6

I have the below code which queries a database of about 500k rows. and it throws a SIGKILL when it hits rows = cur.fetchall(). I've tried to iterate through the cursor rather than load it all up into rows, but it still seems to cause OOM issues.

How can I grab all the data from a database and safely convert it into a parquet file regardless of the size of the table?

def get_parquet_for_dataset_id(self, dataset, lob, max_dt):
        query = _table_query(lob, table_name, max_dt)
        conn = self.conns[lob]

        with conn:
            with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
                cur.execute(query)

                rows = cur.fetchall()

                table = rows_to_table(rows)
                pq_bytes = io.BytesIO()
                pq.write_table(table, pq_bytes)
                _ = pq_bytes.seek(0)

                return pq_bytes;

Elonore answered 2/9, 2020 at 20:38 Comment(5)
What are you doing with the data after you receive it? Do you really need to store it in memory? Can you just stream it out, as by yielding data from an iterator in chunks?Psychophysics
(A question about how to store larger-than-memory data in a parquet file, if how to do so isn't obvious, probably belongs in a question that's specifically about whatever Python interface or library you're using to create that file, and not about PostgreSQL; none of the code that would be needed to be changed on the writing-a-file side is currently included in the question).Psychophysics
@CharlesDuffy the portion that requires writing it to the parquet file isn't being reached. it's failing when trying to do cur.fetchall() So i guess the question about the parquet file is irrelevant right now but if there was a way to avoid writing all of these things into memory than that would be ideal.Elonore
Right; as well established, you shouldn't do fetchall() at all with data larger than memory (this should be obvious on its face; you can't fetch something you can't fit into process space).Psychophysics
...that's the whole reason that fetchmany() exists, so you can get a smaller collection of rows at a time, and just repeat that until you've processed all of them. Of course, if you're getting smaller collections at a time, then you need to be able to add them to your parquet file incrementally (so you can then stop storing them in memory -- because if you just add each fetchmany()'s result to a list stored in RAM you're back to your original problem), and needing to have code to do that is why the question isn't answerable without more content included.Psychophysics
P
2

Here is a way that uses psycopg2, server side cursors, and Pandas, to batch/chunk PostgreSQL query results and write them to a parquet file without it all being in memory at once.

import itertools
import pandas as pd
import psycopg2
import pyarrow as pa
import pyarrow.parquet as pq

def get_schema_and_batches(query, chunk_size):

    def _batches():
        with \
                psycopg2.connect("host=localhost dbname=postgres user=postgres password=password") as conn, \
                conn.cursor(name='my-cursor') as cur:
            cur.itersize = chunk_size
            cur.arraysize = chunk_size
            cur.execute(query)

            while True:
                batch_rows = cur.fetchmany()
                column_names = tuple(col[0] for col in cur.description)
                batch = pa.RecordBatch.from_pandas(pd.DataFrame(batch_rows, columns=column_names), preserve_index=False)
                if not batch:
                    break
                yield batch

    # Faffy to infer the schema from the first batch
    # Could be simplified if we knew it ahead of time
    batches = iter(_batches())
    first_batch = next(batches)

    return first_batch.schema, itertools.chain((first_batch,), batches)

query = 'SELECT * FROM generate_series(0, 100000) AS s(my_col)'
schema, batches = get_schema_and_batches(query, chunk_size=10000)

with pq.ParquetWriter('example.parquet', schema=schema) as writer:
    for batch in batches:
        writer.write_batch(batch)
Parental answered 19/9, 2022 at 10:23 Comment(0)
P
1

Here is a way that uses streampq (full disclosure: written by me) and Pandas to batch/chunk PostgreSQL query results and write them to a parquet file without all the results being in memory at once.

Note that this doesn't use server-side cursors as the answer that uses psycopg2 does. A reason why you might want to avoid server-side cursors is that some queries can be slower with them.

import itertools
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from streampq import streampq_connect

def get_schema_and_batches(sql, chunk_size):

    def _batches():
        with streampq_connect((('host','localhost'), ('dbname','postgres'), ('user', 'postgres'), ('password','password'))) as query:
            for (columns, rows) in query(sql):
                rows_it = iter(rows)
                while True:
                    batch = pa.RecordBatch.from_pandas(
                        pd.DataFrame(itertools.islice(rows_it, chunk_size), columns=columns), preserve_index=False,
                    )
                    if not batch:
                        break
                    yield batch

    # Faffy to infer the schema from the first batch
    # Could be simplified if we knew it ahead of time
    batches = iter(_batches())
    first_batch = next(batches)

    return first_batch.schema, itertools.chain((first_batch,), batches)

sql = 'SELECT * FROM generate_series(0, 100000) AS s(my_col)'
schema, batches = get_schema_and_batches(sql, chunk_size=10000)

with pq.ParquetWriter('example.parquet', schema=schema) as writer:
    for batch in batches:
        writer.write_batch(batch)
Parental answered 19/9, 2022 at 10:36 Comment(0)
E
0

Server side cursors, see here:

When a database query is executed, the Psycopg cursor usually fetches all the records returned by the backend, transferring them to the client process. If the query returned an huge amount of data, a proportionally large amount of memory will be allocated by the client.

If the dataset is too large to be practically handled on the client side, it is possible to create a server side cursor. Using this kind of cursor it is possible to transfer to the client only a controlled amount of data, so that a large dataset can be examined without keeping it entirely in memory.

Erstwhile answered 2/9, 2020 at 21:30 Comment(1)
Unfortunately, the OP's question goes well beyond their title -- see the "and safely convert it into a parquet file" portion of the text. (As they don't include the code they're using for the purpose, there may well be cause to close the question as lacking a minimal reproducible example; but in any event the answer doesn't really address it).Psychophysics

© 2022 - 2024 — McMap. All rights reserved.