using sqlalchemy to load csv file into a database
Asked Answered
A

6

63

I would like to load csv files into a database

Account answered 13/7, 2015 at 23:9 Comment(0)
H
59

Because of the power of SQLAlchemy, I'm also using it on a project. It's power comes from the object-oriented way of "talking" to a database instead of hardcoding SQL statements that can be a pain to manage. Not to mention, it's also a lot faster.

To answer your question bluntly, yes! Storing data from a CSV into a database using SQLAlchemy is a piece of cake. Here's a full working example (I used SQLAlchemy 1.0.6 and Python 2.7.6):

from numpy import genfromtxt
from time import time
from datetime import datetime
from sqlalchemy import Column, Integer, Float, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
    return data.tolist()

Base = declarative_base()

class Price_History(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Price_History'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    date = Column(Date)
    opn = Column(Float)
    hi = Column(Float)
    lo = Column(Float)
    close = Column(Float)
    vol = Column(Float)

if __name__ == "__main__":
    t = time()

    #Create the database
    engine = create_engine('sqlite:///csv_test.db')
    Base.metadata.create_all(engine)

    #Create the session
    session = sessionmaker()
    session.configure(bind=engine)
    s = session()

    try:
        file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv
        data = Load_Data(file_name) 

        for i in data:
            record = Price_History(**{
                'date' : datetime.strptime(i[0], '%d-%b-%y').date(),
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            })
            s.add(record) #Add all the records

        s.commit() #Attempt to commit all the records
    except:
        s.rollback() #Rollback the changes on error
    finally:
        s.close() #Close the connection
    print "Time elapsed: " + str(time() - t) + " s." #0.091s

(Note: this is not necessarily the "best" way to do this, but I think this format is very readable for a beginner; it's also very fast: 0.091s for 251 records inserted!)

I think if you go through it line by line, you'll see what a breeze it is to use. Notice the lack of SQL statements -- hooray! I also took the liberty of using numpy to load the CSV contents in two lines, but it can be done without it if you like.

If you wanted to compare against the traditional way of doing it, here's a full-working example for reference:

import sqlite3
import time
from numpy import genfromtxt

def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d


def Create_DB(db):      
    #Create DB and format it as needed
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("CREATE TABLE [Price_History] ([id] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE, [date] DATE, [opn] FLOAT, [hi] FLOAT, [lo] FLOAT, [close] FLOAT, [vol] INTEGER);")


def Add_Record(db, data):
    #Insert record into table
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("INSERT INTO Price_History({cols}) VALUES({vals});".format(cols = str(data.keys()).strip('[]'), 
                    vals=str([data[i] for i in data]).strip('[]')
                    ))


def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skiprows=1, converters={0: lambda s: str(s)})
    return data.tolist()


if __name__ == "__main__":
    t = time.time() 

    db = 'csv_test_sql.db' #Database filename 
    file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv

    data = Load_Data(file_name) #Get data from CSV

    Create_DB(db) #Create DB

    #For every record, format and insert to table
    for i in data:
        record = {
                'date' : i[0],
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            }
        Add_Record(db, record)

    print "Time elapsed: " + str(time.time() - t) + " s." #3.604s

(Note: even in the "old" way, this is by no means the best way to do this, but it's very readable and a "1-to-1" translation from the SQLAlchemy way vs. the "old" way.)

Notice the the SQL statements: one to create the table, the other to insert records. Also, notice that it's a bit more cumbersome to maintain long SQL strings vs. a simple class attribute addition. Liking SQLAlchemy so far?

As for your foreign key inquiry, of course. SQLAlchemy has the power to do this too. Here's an example of how a class attribute would look like with a foreign key assignment (assuming the ForeignKey class has also been imported from the sqlalchemy module):

class Asset_Analysis(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Asset_Analysis'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    fid = Column(Integer, ForeignKey('Price_History.id'))

which points the "fid" column as a foreign key to Price_History's id column.

Hope that helps!

Highclass answered 14/7, 2015 at 5:0 Comment(10)
I'll take the old way with the sql.Brandybrandyn
This is useful code, but it would be helpful if the data file was included in the example. Then it would be truly self-contained.Blennioid
I haven't checked on why this is happening, but genfromtxt returns the error: genfromtxt() got an unexpected keyword argument 'skiprows'. Numpy is 1.12.1-3 (Debian 9.0).Blennioid
Faheem, a sample CSV file URL is included in one of the comments; first line in the try statement. Download it, place if in the same directory as this script, and run it.Highclass
Faheem, as per docs.scipy.org/doc/numpy/reference/generated/…, skiprows was deprecated. Replace skiprows=1 with skip_header=1. I edited my answer to reflect this change.Highclass
Thanks for the script :) Your data-set is not active at that URL anymore. Also, i took a similar sample data-set (nasdaq.com/symbol/t/historical) to run your script after installing the necessary libraries and had trouble populating the table. I can see the table "Price_History" gets created, but sqlite > select count(id) from Price_History; gave me an output of 0. Any idea what might be wrong?Keen
is there a way to set foreign key that was read from file? (I mean when class X would have one-many relation with class Y, and I've got information about it in csv)Twinscrew
I have created my own data ...just to cope with the list red from the CSV, the DB file is still filled with zeros!! data has two rows as follows: data = [[11,22,33,...], [44,55,66, ...]]Estelleesten
To whom it may concern, I managed to make it work, here's the tweak (I will just use three values for making it simple) record = Price_history( opn = i[0], hi = i[1], lo = i[2]) Estelleesten
This is cool but doesn't it mean that the data will be read into Python first, and then inserted into the DB? Wouldn't it be faster to just use the database's bulk loader and completely skip the part where the data has to flow through Python? I believe that ARA1307's answer addresses this but is DB-specific. It would be nice to have a SQLAlchemy approach that internally uses the bulk load commands specific to your brand of database.Nunes
C
61

In case your CSV is quite large, using INSERTS is very ineffective. You should use a bulk loading mechanisms, which differ from base to base. E.g. in PostgreSQL you should use "COPY FROM" method:

with open(csv_file_path, 'r') as f:    
    conn = create_engine('postgresql+psycopg2://...').raw_connection()
    cursor = conn.cursor()
    cmd = 'COPY tbl_name(col1, col2, col3) FROM STDIN WITH (FORMAT CSV, HEADER FALSE)'
    cursor.copy_expert(cmd, f)
    conn.commit()
Comfortable answered 30/12, 2015 at 5:52 Comment(4)
For anything serious, you will indeed want to use copy_from or copy_expert from psycopg directly. This solution makes it possible to insert millions of rows at once.Sykes
is there any way to achieve this without importing some huge library?Quota
@Sajuuk: this solution can actually only use psycopg2 (which is included in sqlalchemy), rendering it's dependencies considerably reduced (no need to depend on ALL of sqlalchemy, since we're not really using it here). The idea is to replace the first statement's sqlalchemy.create_engine("...").raw_connection() directly with psycopg's psycopg2.connect("...").Atiptoe
I use SQLAlchemy ORM (more out of mandate than preference) but this REALLY should be the accepted answer, at least for Postgres. I have a CSV file with > 1 mil rows, and it took 9 minutes (and required 1.3 GB memory) to use the sqlalchemy-core insert strategy that had the best performance here. Using this COPY ... FROM ... strategy with a StringIO buffer took 10 seconds and only 250 MB memory.Havre
H
59

Because of the power of SQLAlchemy, I'm also using it on a project. It's power comes from the object-oriented way of "talking" to a database instead of hardcoding SQL statements that can be a pain to manage. Not to mention, it's also a lot faster.

To answer your question bluntly, yes! Storing data from a CSV into a database using SQLAlchemy is a piece of cake. Here's a full working example (I used SQLAlchemy 1.0.6 and Python 2.7.6):

from numpy import genfromtxt
from time import time
from datetime import datetime
from sqlalchemy import Column, Integer, Float, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
    return data.tolist()

Base = declarative_base()

class Price_History(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Price_History'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    date = Column(Date)
    opn = Column(Float)
    hi = Column(Float)
    lo = Column(Float)
    close = Column(Float)
    vol = Column(Float)

if __name__ == "__main__":
    t = time()

    #Create the database
    engine = create_engine('sqlite:///csv_test.db')
    Base.metadata.create_all(engine)

    #Create the session
    session = sessionmaker()
    session.configure(bind=engine)
    s = session()

    try:
        file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv
        data = Load_Data(file_name) 

        for i in data:
            record = Price_History(**{
                'date' : datetime.strptime(i[0], '%d-%b-%y').date(),
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            })
            s.add(record) #Add all the records

        s.commit() #Attempt to commit all the records
    except:
        s.rollback() #Rollback the changes on error
    finally:
        s.close() #Close the connection
    print "Time elapsed: " + str(time() - t) + " s." #0.091s

(Note: this is not necessarily the "best" way to do this, but I think this format is very readable for a beginner; it's also very fast: 0.091s for 251 records inserted!)

I think if you go through it line by line, you'll see what a breeze it is to use. Notice the lack of SQL statements -- hooray! I also took the liberty of using numpy to load the CSV contents in two lines, but it can be done without it if you like.

If you wanted to compare against the traditional way of doing it, here's a full-working example for reference:

import sqlite3
import time
from numpy import genfromtxt

def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d


def Create_DB(db):      
    #Create DB and format it as needed
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("CREATE TABLE [Price_History] ([id] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE, [date] DATE, [opn] FLOAT, [hi] FLOAT, [lo] FLOAT, [close] FLOAT, [vol] INTEGER);")


def Add_Record(db, data):
    #Insert record into table
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("INSERT INTO Price_History({cols}) VALUES({vals});".format(cols = str(data.keys()).strip('[]'), 
                    vals=str([data[i] for i in data]).strip('[]')
                    ))


def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skiprows=1, converters={0: lambda s: str(s)})
    return data.tolist()


if __name__ == "__main__":
    t = time.time() 

    db = 'csv_test_sql.db' #Database filename 
    file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv

    data = Load_Data(file_name) #Get data from CSV

    Create_DB(db) #Create DB

    #For every record, format and insert to table
    for i in data:
        record = {
                'date' : i[0],
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            }
        Add_Record(db, record)

    print "Time elapsed: " + str(time.time() - t) + " s." #3.604s

(Note: even in the "old" way, this is by no means the best way to do this, but it's very readable and a "1-to-1" translation from the SQLAlchemy way vs. the "old" way.)

Notice the the SQL statements: one to create the table, the other to insert records. Also, notice that it's a bit more cumbersome to maintain long SQL strings vs. a simple class attribute addition. Liking SQLAlchemy so far?

As for your foreign key inquiry, of course. SQLAlchemy has the power to do this too. Here's an example of how a class attribute would look like with a foreign key assignment (assuming the ForeignKey class has also been imported from the sqlalchemy module):

class Asset_Analysis(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Asset_Analysis'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    fid = Column(Integer, ForeignKey('Price_History.id'))

which points the "fid" column as a foreign key to Price_History's id column.

Hope that helps!

Highclass answered 14/7, 2015 at 5:0 Comment(10)
I'll take the old way with the sql.Brandybrandyn
This is useful code, but it would be helpful if the data file was included in the example. Then it would be truly self-contained.Blennioid
I haven't checked on why this is happening, but genfromtxt returns the error: genfromtxt() got an unexpected keyword argument 'skiprows'. Numpy is 1.12.1-3 (Debian 9.0).Blennioid
Faheem, a sample CSV file URL is included in one of the comments; first line in the try statement. Download it, place if in the same directory as this script, and run it.Highclass
Faheem, as per docs.scipy.org/doc/numpy/reference/generated/…, skiprows was deprecated. Replace skiprows=1 with skip_header=1. I edited my answer to reflect this change.Highclass
Thanks for the script :) Your data-set is not active at that URL anymore. Also, i took a similar sample data-set (nasdaq.com/symbol/t/historical) to run your script after installing the necessary libraries and had trouble populating the table. I can see the table "Price_History" gets created, but sqlite > select count(id) from Price_History; gave me an output of 0. Any idea what might be wrong?Keen
is there a way to set foreign key that was read from file? (I mean when class X would have one-many relation with class Y, and I've got information about it in csv)Twinscrew
I have created my own data ...just to cope with the list red from the CSV, the DB file is still filled with zeros!! data has two rows as follows: data = [[11,22,33,...], [44,55,66, ...]]Estelleesten
To whom it may concern, I managed to make it work, here's the tweak (I will just use three values for making it simple) record = Price_history( opn = i[0], hi = i[1], lo = i[2]) Estelleesten
This is cool but doesn't it mean that the data will be read into Python first, and then inserted into the DB? Wouldn't it be faster to just use the database's bulk loader and completely skip the part where the data has to flow through Python? I believe that ARA1307's answer addresses this but is DB-specific. It would be nice to have a SQLAlchemy approach that internally uses the bulk load commands specific to your brand of database.Nunes
G
22

I have had the exact same problem, and I found it paradoxically easier to use a 2-step process with pandas:

import pandas as pd
with open(csv_file_path, 'r') as file:
    data_df = pd.read_csv(file)
data_df.to_sql('tbl_name', con=engine, index=True, index_label='id', if_exists='replace')

Note that my approach is similar to this one, but somehow Google sent me to this thread instead, so I thought I would share.

Goosefish answered 25/11, 2018 at 21:40 Comment(3)
What if you have a very large file that you need to upload into the database?Bile
For medium-sized files you can set up sqlalchemy engine with create_engine(..., fast_executemany=True), which will speed up pandas' to_sql as well.Epos
what is engine?Solarium
E
8

To import a relatively small CSV file into database using sqlalchemy, you can use engine.execute(my_table.insert(), list_of_row_dicts), as described in detail in the "Executing Multiple Statements" section of the sqlalchemy tutorial.

This is sometimes referred to as "executemany" style of invocation, because it results in an executemany DBAPI call. The DB driver might execute a single multi-value INSERT .. VALUES (..), (..), (..) statement, which results in fewer round-trips to the DB and faster execution:

According to the sqlalchemy's FAQ, this is the fastest you can get without using DB-specific bulk loading methods, such as COPY FROM in Postgres, LOAD DATA LOCAL INFILE in MySQL, etc. In particular it's faster than using plain ORM (as in the answer by @Manuel J. Diaz here), bulk_save_objects, or bulk_insert_mappings.

import csv
from sqlalchemy import create_engine, Table, Column, Integer, MetaData

engine = create_engine('sqlite:///sqlalchemy.db', echo=True)

metadata = MetaData()
# Define the table with sqlalchemy:
my_table = Table('MyTable', metadata,
    Column('foo', Integer),
    Column('bar', Integer),
)
metadata.create_all(engine)
insert_query = my_table.insert()

# Or read the definition from the DB:
# metadata.reflect(engine, only=['MyTable'])
# my_table = Table('MyTable', metadata, autoload=True, autoload_with=engine)
# insert_query = my_table.insert()

# Or hardcode the SQL query:
# insert_query = "INSERT INTO MyTable (foo, bar) VALUES (:foo, :bar)"

with open('test.csv', 'r', encoding="utf-8") as csvfile:
    csv_reader = csv.reader(csvfile, delimiter=',')
    engine.execute(
        insert_query,
        [{"foo": row[0], "bar": row[1]} 
            for row in csv_reader]
    )
Epos answered 16/8, 2019 at 18:58 Comment(1)
+1 for stating "it's the fastest you can get without DB-specific..." - though if someone does have the option to use specific bulk loading (COPY FROM, etc.) then they probably shouldHavre
L
3

CSV file with commas and header names to PostrgeSQL

  1. I'm using csv Python reader. CSV data divided by commas (,)
  2. Then convert it to Pandas DataFrame. Names of the columns the same as in your csv file.
  3. End the last, DataFrame to sql with engine as connection to DB. if_exists='replace/append'
import csv
import pandas as pd
from sqlalchemy import create_engine

# Create engine to connect with DB
try:
    engine = create_engine(
        'postgresql://username:password@localhost:5432/name_of_base')
except:
    print("Can't create 'engine")

# Get data from CSV file to DataFrame(Pandas)
with open('test.csv', newline='') as csvfile:
    reader = csv.DictReader(csvfile)
    columns = ['one', 'two', 'three']
    df = pd.DataFrame(data=reader, columns=columns)

# Standart method of Pandas to deliver data from DataFrame to PastgresQL
try:
    with engine.begin() as connection:
        df.to_sql('name_of_table', con=connection, index_label='id', if_exists='replace')
        print('Done, ok!')
except Exception as e:
        print(e)
Lauretta answered 12/2, 2021 at 7:59 Comment(0)
M
1

This is the only way I could get it to work. The other answers do not explicitly commit the cursor's connection. This also implies you're using modern python, sqlalchemy, and obviously postgres since the syntax uses COPY ... FROM.

There's no error handling, it's probably not secure, and it uses all columns in the ORM mapper definition that aren't primary keys, but for simple tasks it'll probably do fine.

import io

import sqlalchemy

Base: sqlalchemy.orm.DeclarativeMeta = db.orm.declarative_base()


def upload_to_model_table(
        Model: Base,
        csv_stream: io.IOBase,
        engine: sqlalchemy.engine,
        header=True,
        delimiter=';'
):
    """ It's assumed you're using postgres, otherwise this won't work. """
    fieldnames = ', '.join([
        f'"{col.name}"' for col in Model.__mapper__.columns if not col.primary_key
    ])

    sql = """
    COPY {0} ({1}) FROM stdin WITH (format CSV, header {2}, delimiter '{3}')
    """.format(Model.__tablename__, fieldnames, header, delimiter)

    chunk_size = getattr(csv_stream, "_DEFAULT_CHUNK_SIZE", 1024)
    with engine.connect() as connection:
        cursor = connection.connection.cursor()
        cursor.copy_expert(sql, csv_stream, chunk_size)
        cursor.connection.commit()
        cursor.close()

Multipara answered 18/10, 2022 at 13:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.