Pull large amounts of data from a remote server, into a DataFrame
Asked Answered
N

4

9

To give as much context as I can / is needed, I'm trying to pull some data stored on a remote postgres server (heroku) into a pandas DataFrame, using psycopg2 to connect.

I'm interested in two specific tables, users and events, and the connection works fine, because when pulling down the user data

import pandas.io.sql as sql 
# [...]
users = sql.read_sql("SELECT * FROM users", conn)

after waiting a few seconds, the DataFrame is returned as expected.

<class 'pandas.core.frame.DataFrame'>
Int64Index: 67458 entries, 0 to 67457
Data columns (total 35 columns): [...]

Yet when trying to pull the bigger, heavier events data straight from ipython, after a long time, it just crashes:

In [11]: events = sql.read_sql("SELECT * FROM events", conn)
vagrant@data-science-toolbox:~$

and when trying from an iPython notebook I get the Dead kernel error

The kernel has died, would you like to restart it? If you do not restart the kernel, you will be able to save the notebook, but running code will not work until the notebook is reopened.


Update #1:

To get a better idea of the size of the events table I'm trying to pull in, here are the number of records and the number of attributes for each:

In [11]: sql.read_sql("SELECT count(*) FROM events", conn)
Out[11]:
     count
0  2711453

In [12]: len(sql.read_sql("SELECT * FROM events LIMIT 1", conn).columns)
Out[12]: 18

Update #2:

Memory is definitely a bottleneck for the current implementation of read_sql: when pulling down the events and trying to run another instance of iPython the result is

vagrant@data-science-toolbox:~$ sudo ipython
-bash: fork: Cannot allocate memory

Update #3:

I first tried with a read_sql_chunked implementation that would just return the array of partial DataFrames:

def read_sql_chunked(query, conn, nrows, chunksize=1000):
    start = 0
    dfs = []
    while start < nrows:
        df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), conn)
        start += chunksize
        dfs.append(df)
        print "Events added: %s to %s of %s" % (start-chunksize, start, nrows)
    # print "concatenating dfs"
    return dfs

event_dfs = read_sql_chunked("SELECT * FROM events", conn, events_count, 100000)

and that works well, but when trying to concatenate the DataFrames, the kernel dies again.
And this is after giving the VM 2GB of RAM.

Based on Andy's explanation of read_sql vs. read_csv difference in implementation and performance, the next thing I tried was to append the records into a CSV and then read them all into a DataFrame:

event_dfs[0].to_csv(path+'new_events.csv', encoding='utf-8')

for df in event_dfs[1:]:
    df.to_csv(path+'new_events.csv', mode='a', header=False, encoding='utf-8')

Again, the writing to CSV completes successfully – a 657MB file – but reading from the CSV never completes.

How can one approximate how much RAM would be sufficient to read say a 657MB CSV file, since 2GB seem not to be enough?


Feels like I'm missing some fundamental understanding of either DataFrames or psycopg2, but I'm stuck, I can't even pinpoint the bottleneck or where to optimize.

What's the proper strategy to pull larger amounts of data from a remote (postgres) server?

Naphthol answered 2/9, 2014 at 23:17 Comment(5)
As an experience this sucks! Hope we can get this working for you in the future. Out of curiosity how large is your table / how many rows?Blumenthal
@AndyHayden updated to add in the number of records and the number of attributes for each one on for the events table.Naphthol
Do you need all data at once in memory? Or is it sufficient to only have part of the data (eg certain columns) at the same time in a DataFrame? (but apart from that, your question on how big a dataframe can be is legitimate of course)Maxim
@Maxim At this point I'm using both: a couple of DataFrames with very small subsets of the 18 columns, and the whole dataset divided in 28 partial DataFrames. At least for initial exploration, having all the data would seem ideal though.Naphthol
Using something like HDF5 (with pandas read_hdf/HDFStore) would make it easy and fast to query subsets of data on demand if it is too big to pull all in memory at once (much faster than sql, and possible to query subset as opposed to csv)Maxim
B
5

I suspect there's a couple of (related) things at play here causing slowness:

  1. read_sql is written in python so it's a little slow (especially compared to read_csv, which is written in cython - and carefully implemented for speed!) and it relies on sqlalchemy rather than some (potentially much faster) C-DBAPI. The impetus to move to sqlalchmey was to make that move easier in the future (as well as cross-sql-platform support).
  2. You may be running out of memory as too many python objects are in memory (this is related to not using a C-DBAPI), but potentially could be addressed...

I think the immediate solution is a chunk-based approach (and there is a feature request to have this work natively in pandas read_sql and read_sql_table).

EDIT: As of Pandas v0.16.2 this chunk based approach is natively implemented in read_sql.


Since you're using postgres you have access the the LIMIT and OFFSET queries, which makes chunking quite easy. (Am I right in thinking these aren't available in all sql languages?)

First, get the number of rows (or an estimate) in your table:

nrows = con.execute('SELECT count(*) FROM users').fetchone()[0]  # also works with an sqlalchemy engine

Use this to iterate through the table (for debugging you could add some print statements to confirm that it was working/not crashed!) and then combine the result:

def read_sql_chunked(query, con, nrows, chunksize=1000):
    start = 1
    dfs = []  # Note: could probably make this neater with a generator/for loop
    while start < nrows:
        df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), con)
        dfs.append(df)
    return pd.concat(dfs, ignore_index=True)

Note: this assumes that the database fits in memory! If it doesn't you'll need to work on each chunk (mapreduce style)... or invest in more memory!

Blumenthal answered 3/9, 2014 at 5:20 Comment(6)
Memory might very much be the bottleneck: I'm running a VM that only had the default 512M. Quickly bumping up to 1024M and if that doesn't work, I'll give the chunked read a try.Naphthol
@MariusButuc let me know how this solution fairs/if you have any issues!Blumenthal
added Update #3 with my new (still less successful) attempts.Naphthol
@MariusButuc definitely get/allocate more ram, 2gb is not very much imo - it's almost certainly swapping here! You could use pytables/HDF5 to do the concat on disk... see pandas.pydata.org/pandas-docs/stable/io.html#table-format, but that may not be enough.Blumenthal
It sure worked on an AWS m3.large... 7GB of RAM were the ones that did the trick.Naphthol
Is the use of "%s LIMIT %s OFFSET %s" a hack. Can we find a more robust solution?Revell
B
0

try to use pandas:

mysql_cn = mysql.connector.connect(host='localhost', port=123, user='xyz',  passwd='****', db='xy_db')**

data= pd.read_sql('SELECT * FROM table;', con=mysql_cn)

mysql_cn.close()

It worked for me.

Becquerel answered 5/7, 2017 at 9:29 Comment(0)
T
0

Here is a basic cursor example that might be of help:

import psycopg2

note that we have to import the Psycopg2 extras library!

import psycopg2.extras

import sys

def main(): conn_string = "host='localhost' dbname='my_database' user='postgres' password='secret'" ### print the connection string we will use to connect

conn = psycopg2.connect(conn_string)

### HERE IS THE IMPORTANT PART, by specifying a name for the cursor
### psycopg2 creates a server-side cursor, which prevents all of the
### records from being downloaded at once from the server.
cursor = conn.cursor('cursor_unique_name', cursor_factory=psycopg2.extras.DictCursor)
cursor.execute('SELECT * FROM my_table LIMIT 1000')

### Because cursor objects are iterable we can just call 'for - in' on
### the cursor object and the cursor will automatically advance itself
### each iteration.
### This loop should run 1000 times, assuming there are at least 1000
### records in 'my_table'
row_count = 0
for row in cursor:
    row_count += 1
    print "row: %s    %s\n" % (row_count, row)

if name == "main": main()

Twicetold answered 9/10, 2019 at 6:21 Comment(0)
J
0

Using https://github.com/sfu-db/connector-x much higher speeds seem to be possible:

From their readme:

ConnectorX enables you to load data from databases into Python in the fastest and most memory efficient way.

What you need is one line of code:

import connectorx as cx

cx.read_sql("postgresql://username:password@server:port/database", "SELECT * FROM lineitem")

Optionally, you can accelerate the data loading using parallelism by specifying a partition column.

import connectorx as cx

cx.read_sql("postgresql://username:password@server:port/database", "SELECT * FROM lineitem", partition_on="l_orderkey", partition_num=10)

The function will partition the query by evenly splitting the specified column to the amount of partitions. ConnectorX will assign one thread for each partition to load and write data in parallel.

Note: I have not used it myself, but have seen huge improvements by using connector-x in a project of a friend.


Not directly relevant to the question, but if the query is more complex, connector-x has some overhead, see the FAQ

In that case Arrow as an intermediate destination can be faster. (Arrow can be installed via pip install pyarrow)

table = cx.read_sql(db_uri, query, return_type="arrow") # or arrow2 https://github.com/jorgecarleitao/arrow2
df = table.to_pandas(split_blocks=False, date_as_object=False)
Jeniferjeniffer answered 25/11, 2022 at 19:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.