How to create a large pandas dataframe from an sql query without running out of memory?
Asked Answered
D

11

84

I have trouble querying a table of > 5 million records from MS SQL Server database. I want to select all of the records, but my code seems to fail when selecting to much data into memory.

This works:

import pandas.io.sql as psql
sql = "SELECT TOP 1000000 * FROM MyTable" 
data = psql.read_frame(sql, cnxn)

...but this does not work:

sql = "SELECT TOP 2000000 * FROM MyTable" 
data = psql.read_frame(sql, cnxn)

It returns this error:

File "inference.pyx", line 931, in pandas.lib.to_object_array_tuples
(pandas\lib.c:42733) Memory Error

I have read here that a similar problem exists when creating a dataframe from a csv file, and that the work-around is to use the 'iterator' and 'chunksize' parameters like this:

read_csv('exp4326.csv', iterator=True, chunksize=1000)

Is there a similar solution for querying from an SQL database? If not, what is the preferred work-around? Should I use some other methods to read the records in chunks? I read a bit of discussion here about working with large datasets in pandas, but it seems like a lot of work to execute a SELECT * query. Surely there is a simpler approach.

Dough answered 7/8, 2013 at 15:50 Comment(4)
How much memory do you have?Plasterboard
@PhillipCloud my machine has 4GB of ram.Dough
Depending on the dtype of your columns and the number of columns you could easily reach 4GB. E.g.,Plasterboard
Starting from pandas 0.15, you have a chunksize option in read_sql to read and process the query chunk by chunk: pandas.pydata.org/pandas-docs/version/0.15.0/io.html#queryingDrews
E
65

Update: Make sure to check out the answer below, as Pandas now has built-in support for chunked loading.

You could simply try to read the input table chunk-wise and assemble your full dataframe from the individual pieces afterwards, like this:

import pandas as pd
import pandas.io.sql as psql
chunk_size = 10000
offset = 0
dfs = []
while True:
  sql = "SELECT * FROM MyTable limit %d offset %d order by ID" % (chunk_size,offset) 
  dfs.append(psql.read_frame(sql, cnxn))
  offset += chunk_size
  if len(dfs[-1]) < chunk_size:
    break
full_df = pd.concat(dfs)

It might also be possible that the whole dataframe is simply too large to fit in memory, in that case you will have no other option than to restrict the number of rows or columns you're selecting.

Era answered 7/8, 2013 at 16:10 Comment(3)
-Thanks, I will try this out, though I fear that the memory space may indeed be my issue. Also, Since I am using MS SQL-Server2008, the LIMIT and OFFSET SQL options are not available to me. Others should know to reference here for the solution specific to their setupDough
you can also write these df's to a HDF5 file (the question you referenced uses that, also peruse docs, appending the tables: pandas.pydata.org/pandas-docs/dev/io.html#hdf5-pytables. Then read back (sections, or iterate as needed); HDF5 much more compact then SQL for dataSkinflint
for postgres order by comes before limit : SELECT * FROM my_table order by id limit %d offset %d ; New pandas use read_sql instead of read_frame.Cohere
D
82

As mentioned in a comment, starting from pandas 0.15, you have a chunksize option in read_sql to read and process the query chunk by chunk:

sql = "SELECT * FROM My_Table"
for chunk in pd.read_sql_query(sql , engine, chunksize=5):
    print(chunk)

Reference: http://pandas.pydata.org/pandas-docs/version/0.15.2/io.html#querying

Dignity answered 8/4, 2015 at 18:22 Comment(9)
This is the way to handle issues where RAM size < db_you_wish_to_load sizeDiscover
E.g. a keyword argument skip_chunks=number_of_chunks, that can skip first few chunks without even reading them. The use case is reading end or middle of SQL table with over 1 million rows without resorting to SQL query or SQLalchemy.Tact
For MySQL databases this is not the case. If your dataset won't fit in your available memory, it takes a lot of time to extract since MySQL does not have a server side cursor to work with.Holmgren
Anyone relying on using the chunksize option should first read github.com/pandas-dev/pandas/issues/12265. For many databases, the entire dataset will still be read into memory whole, before an iterator is returned. For some databases, setting connection options appropriately can overcome this problem - for instance with Postgres, set execution_options={'stream_results': True} when creating the engine...Housebreaking
this does not save memory-- it pulls down the whole table and then chunks it.Upwards
@Upwards see Janak Mayer's comment aboveColas
Check the last comment of Janak Mayer linkCounterfactual
@JanakMayer do you still need to specify chunksize when setting stream_results?Sonnysonobuoy
is there a way to assign each chunk to a different variable as a dataframe?Buyers
E
65

Update: Make sure to check out the answer below, as Pandas now has built-in support for chunked loading.

You could simply try to read the input table chunk-wise and assemble your full dataframe from the individual pieces afterwards, like this:

import pandas as pd
import pandas.io.sql as psql
chunk_size = 10000
offset = 0
dfs = []
while True:
  sql = "SELECT * FROM MyTable limit %d offset %d order by ID" % (chunk_size,offset) 
  dfs.append(psql.read_frame(sql, cnxn))
  offset += chunk_size
  if len(dfs[-1]) < chunk_size:
    break
full_df = pd.concat(dfs)

It might also be possible that the whole dataframe is simply too large to fit in memory, in that case you will have no other option than to restrict the number of rows or columns you're selecting.

Era answered 7/8, 2013 at 16:10 Comment(3)
-Thanks, I will try this out, though I fear that the memory space may indeed be my issue. Also, Since I am using MS SQL-Server2008, the LIMIT and OFFSET SQL options are not available to me. Others should know to reference here for the solution specific to their setupDough
you can also write these df's to a HDF5 file (the question you referenced uses that, also peruse docs, appending the tables: pandas.pydata.org/pandas-docs/dev/io.html#hdf5-pytables. Then read back (sections, or iterate as needed); HDF5 much more compact then SQL for dataSkinflint
for postgres order by comes before limit : SELECT * FROM my_table order by id limit %d offset %d ; New pandas use read_sql instead of read_frame.Cohere
G
22

Code solution and remarks.

# Create empty list
dfl = []  

# Create empty dataframe
dfs = pd.DataFrame()  

# Start Chunking
for chunk in pd.read_sql(query, con=conct, ,chunksize=10000000):

    # Start Appending Data Chunks from SQL Result set into List
    dfl.append(chunk)

# Start appending data from list to dataframe
dfs = pd.concat(dfl, ignore_index=True)

However, my memory analysis tells me that even though the memory is released after each chunk is extracted, the list is growing bigger and bigger and occupying that memory resulting in a net net no gain on free RAM.

Would love to hear what the author / others have to say.

Giddy answered 26/8, 2019 at 16:44 Comment(7)
Saving chunks to disk, not saving a dataset, deleting chunks via "del" should be ok. You may also save chunks to a disk that you change its dtype to less memory consumption. And you may save the df as parquets format, then reading only needed columns.Collin
@Giddy why use append and concat both?Fou
(Append) - add extracted data chunks to a list (Concatenate) - combine all chunks in the above list into one dataframeGiddy
@Giddy is there a way to assign each chunk to a different variable as a dataframe?Buyers
@LeylaAlkan If you mean "assign each chunk to a different dataframe" then my understanding is that each element of the <list> df1 is a <dataframe> already. In other words, <list> df1 is a collection of dataframes that can be referenced individually as you would reference any individual element of a list.Giddy
@Giddy The list grows bigger and bigger, than pyrthon throws a memory error. So, it's not possible to save them as elements of a list in my caseBuyers
@LeylaAlkan : You are correct. As other people in the thread have said in the above posts and quoting eljusticiero67 - "his does not save memory-- it pulls down the whole table and then chunks it"Giddy
F
5

The best way I found to handle this is to leverage the SQLAlchemy steam_results connection options

conn = engine.connect().execution_options(stream_results=True)

And passing the conn object to pandas in

pd.read_sql("SELECT *...", conn, chunksize=10000)

This will ensure that the cursor is handled server-side rather than client-side

Fazeli answered 19/11, 2021 at 21:42 Comment(0)
P
2

You can use Server Side Cursors (a.k.a. stream results)

import pandas as pd
from sqlalchemy import create_engine

def process_sql_using_pandas():
    engine = create_engine(
        "postgresql://postgres:pass@localhost/example"
    )
    conn = engine.connect().execution_options(
        stream_results=True)

    for chunk_dataframe in pd.read_sql(
            "SELECT * FROM users", conn, chunksize=1000):
        print(f"Got dataframe w/{len(chunk_dataframe)} rows")
        # ... do something with dataframe ...

if __name__ == '__main__':
    process_sql_using_pandas()

As mentioned in the comments by others, using the chunksize argument in pd.read_sql("SELECT * FROM users", engine, chunksize=1000) does not solve the problem as it still loads the whole data in the memory and then gives it to you chunk by chunk.

More explanation here

Plume answered 31/3, 2022 at 20:21 Comment(0)
D
2

Here is a one-liner. I was able to load in 49m records to the dataframe without running out of memory.

dfs = pd.concat(pd.read_sql(sql, engine, chunksize=500000), ignore_index=True)
Divest answered 29/12, 2022 at 19:50 Comment(0)
E
1

chunksize still loads all the data in memory, stream_results=True is the answer. it is server side cursor that loads the rows in given chunks and save memory.. efficiently using in many pipelines, it may also help when you load history data

stream_conn = engine.connect().execution_options(stream_results=True)

use pd.read_sql with thechunksize

pd.read_sql("SELECT * FROM SOURCE", stream_conn , chunksize=5000)
Euhemerize answered 29/7, 2022 at 6:18 Comment(2)
stream_results only applicable for a few DB :(Persistent
Yes, above anwser is sepecific to a question that is being discussed here for ms sql serverEuhemerize
H
0

you can update version airflow. for example, I had that error in the version 2.2.3 using docker-compose.

  • AIRFLOW__CORE__EXECUTOR=CeleryExecutor

mysq 6.7

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

redis:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "250M" 

airflow-webserver:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

airflow-scheduler:

cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"

airflow-worker:

#cpus: "0.5"
#mem_reservation: "10M"
#mem_limit: "750M"

error: Task exited with return code Negsignal.SIGKILL

but update to the version FROM apache/airflow:2.3.4.

and perform the pulls without problems, using the same resources configured in the docker-compose

enter image description here

my dag extractor:

function

def getDataForSchema(table,conecction,tmp_path, **kwargs):

conn=connect_sql_server(conecction)

query_count= f"select count(1) from {table['schema']}.{table['table_name']}"
logging.info(f"query: {query_count}")
real_count_rows = pd.read_sql_query(query_count, conn) 

##sacar  esquema de la tabla
metadataquery=f"SELECT COLUMN_NAME ,DATA_TYPE  FROM information_schema.columns \
    where table_name = '{table['table_name']}' and table_schema= '{table['schema']}'"
#logging.info(f"query metadata: {metadataquery}")                
metadata = pd.read_sql_query(metadataquery, conn) 
schema=generate_schema(metadata)

#logging.info(f"schema : {schema}")
#logging.info(f"schema: {schema}")

#consulta la tabla a extraer
query=f" SELECT  {table['custom_column_names']} FROM {table['schema']}.{table['table_name']} "
logging.info(f"quere data :{query}")
chunksize=table["partition_field"]
data = pd.read_sql_query(query, conn, chunksize=chunksize)

count_rows=0
pqwriter=None
iteraccion=0
for df_row in data:       
    print(f"bloque  {iteraccion} de  total {count_rows} de un total {real_count_rows.iat[0, 0]}")
    #logging.info(df_row.to_markdown())
    if iteraccion == 0:
        parquetName=f"{tmp_path}/{table['table_name']}_{iteraccion}.parquet"
        pqwriter = pq.ParquetWriter(parquetName,schema)
    tableData = pa.Table.from_pandas(df_row, schema=schema,safe=False, preserve_index=True)
    #logging.info(f" tabledata {tableData.column(17)}")
    pqwriter.write_table(tableData)
    #logging.info(f"parquet name:::{parquetName}")
    ##pasar a parquet df directo
    #df_row.to_parquet(parquetName)
    iteraccion=iteraccion+1
    count_rows += len(df_row)
    del df_row
    del tableData
if pqwriter:
    print("Cerrando archivo parquet")
    pqwriter.close()
del data
del chunksize
del iteraccion
Hammering answered 15/11, 2022 at 17:2 Comment(0)
P
0

Full one-line code using sqlalchemy and with operator:

db_engine = sqlalchemy.create_engine(db_url, pool_size=10, max_overflow=20)
with Session(db_engine) as session:
    sql_qry = text("Your query")
    data = pd.concat(pd.read_sql(sql_qry,session.connection().execution_options(stream_results=True), chunksize=500000), ignore_index=True)

You can try to change chunksize to find the optimal size for your case.

Photostat answered 30/12, 2022 at 0:35 Comment(0)
U
0

You can use chunksize option, but need to set it up to 6-7 digit if you have RAM issue.

for chunk in  pd.read_sql(sql, engine, params = (fromdt, todt,filecode), chunksize=100000):
 df1.append(chunk)
 dfs = pd.concat(df1, ignore_index=True)

do this

Uneven answered 24/1, 2023 at 15:44 Comment(1)
Please use "`" for adding code blocks.Glennieglennis
A
-1

If you want to limit the number of rows in output, just use:

data = psql.read_frame(sql, cnxn,chunksize=1000000).__next__()
Alkene answered 14/1, 2020 at 12:15 Comment(1)
This does not solve the problem as it still loads the whole data in the memory and then gives it to you chunk by chunkPlume

© 2022 - 2024 — McMap. All rights reserved.