Bulk insert/Update with Petapoco
Asked Answered
U

9

21

I'm using the Save() method to insert or update records, but I would like to make it perform a bulk insert and bulk update with only one database hit. How do I do this?

Upthrust answered 6/7, 2011 at 10:44 Comment(0)
P
14

In my case, I took advantage of the database.Execute() method.

I created a SQL parameter that had the first part of my insert:

var sql = new Sql("insert into myTable(Name, Age, Gender) values");

for (int i = 0; i < pocos.Count ; ++i)
{
   var p = pocos[i];
   sql.Append("(@0, @1, @2)", p.Name, p.Age , p.Gender);
   if(i != pocos.Count -1)
     sql.Append(",");
}

Database.Execute(sql);
Pegg answered 10/5, 2012 at 22:5 Comment(2)
It produces StackOverflow in PetaPoco "Sql.Append()" method if there are a lot if items in the list.Kelso
@Kelso How many is "a lot"?Refugiorefulgence
B
12

I tried two different methods for inserting a large quantity of rows faster than the default Insert (which is pretty slow when you have a lot of rows).

1) Making up a List<T> with the poco's first and then inserting them at once within a loop (and in a transaction):

using (var tr = PetaPocoDb.GetTransaction())
{
    foreach (var record in listOfRecords)
    {
        PetaPocoDb.Insert(record);
    }
    tr.Complete();
}

2) SqlBulkCopy a DataTable:

var bulkCopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.TableLock);
bulkCopy.DestinationTableName = "SomeTable";
bulkCopy.WriteToServer(dt);

To get my List <T> to a DataTable I used Marc Gravells Convert generic List/Enumerable to DataTable? function which worked ootb for me (after I rearranged the Poco properties to be in the exact same order as the table fields in the db.)

The SqlBulkCopy was fastest, 50% or so faster than the transactions method in the (quick) perf tests I did with ~1000 rows.

Hth

Blane answered 22/11, 2011 at 21:39 Comment(2)
I think your first method still goes to the database for every insertAbecedary
Yeah, would be interesting to compare speeds with the combined inserts tsql. In my case I stopped digging for more perf when I noticed I was only 50% slower than bulkcopy.Blane
K
7

Insert in one SQL query is much faster.

Here is a customer method for PetaPoco.Database class that adds ability to do a bulk insert of any collection:

public void BulkInsertRecords<T>(IEnumerable<T> collection)
        {
            try
            {
                OpenSharedConnection();
                using (var cmd = CreateCommand(_sharedConnection, ""))
                {
                    var pd = Database.PocoData.ForType(typeof(T));
                    var tableName = EscapeTableName(pd.TableInfo.TableName);
                    string cols = string.Join(", ", (from c in pd.QueryColumns select tableName + "." + EscapeSqlIdentifier(c)).ToArray());
                    var pocoValues = new List<string>();
                    var index = 0;
                    foreach (var poco in collection)
                    {
                        var values = new List<string>();
                        foreach (var i in pd.Columns)
                        {
                            values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                            AddParam(cmd, i.Value.GetValue(poco), _paramPrefix);
                        }
                        pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")");
                    }
                    var sql = string.Format("INSERT INTO {0} ({1}) VALUES {2}", tableName, cols, string.Join(", ", pocoValues));
                    cmd.CommandText = sql;
                    cmd.ExecuteNonQuery();
                }
            }
            finally
            {
                CloseSharedConnection();
            }
        }
Kelso answered 23/1, 2013 at 11:52 Comment(1)
Very nice, it would be interesting to compare perfs with the transaction method I describe (I believe yours is faster, but how much?) Also - afaik - if you wrap your inserts inside a transaction you should gain a bit extra perf ( blog.staticvoid.co.nz/2012/4/26/… )Blane
A
4

Below is a BulkInsert method of PetaPoco that expands on taylonr's very clever idea to use the SQL technique of insert multiple rows via INSERT INTO tab(col1, col2) OUTPUT inserted.[ID] VALUES (@0, @1), (@2, 3), (@4, @5), ..., (@n-1, @n).

It also returns the auto-increment (identity) values of inserted records, which I don't believe happens in IvoTops' implementation.

NOTE: SQL Server 2012 (and below) has a limit of 2,100 parameters per query. (This is likely the source of the stack overflow exception referenced by Zelid's comment). You will need to manually split your batches up based on the number of columns that are not decorated as Ignore or Result. For example, a POCO with 21 columns should be sent in batch sizes of 99, or (2100 - 1) / 21. I may refactor this to dynamically split batches based on this limit for SQL Server; however, you will always see the best results by managing the batch size external to this method.

This method showed an approximate 50% gain in execution time over my previous technique of using a shared connection in a single transaction for all inserts.

This is one area where Massive really shines - Massive has a Save(params object[] things) that builds an array of IDbCommands, and executes each one on a shared connection. It works out of the box, and doesn't run into parameter limits.

/// <summary>
/// Performs an SQL Insert against a collection of pocos
/// </summary>
/// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted.  Assumes that every POCO is of the same type.</param>
/// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
/// <remarks>
///     NOTE: As of SQL Server 2012, there is a limit of 2100 parameters per query.  This limitation does not seem to apply on other platforms, so 
///           this method will allow more than 2100 parameters.  See http://msdn.microsoft.com/en-us/library/ms143432.aspx
///     The name of the table, it's primary key and whether it's an auto-allocated primary key are retrieved from the attributes of the first POCO in the collection
/// </remarks>
public object[] BulkInsert(IEnumerable<object> pocos)
{
    Sql sql;
    IList<PocoColumn> columns = new List<PocoColumn>();
    IList<object> parameters;
    IList<object> inserted;
    PocoData pd;
    Type primaryKeyType;
    object template;
    string commandText;
    string tableName;
    string primaryKeyName;
    bool autoIncrement;


    if (null == pocos)
        return new object[] {};

    template = pocos.First<object>();

    if (null == template)
        return null;

    pd = PocoData.ForType(template.GetType());
    tableName = pd.TableInfo.TableName;
    primaryKeyName = pd.TableInfo.PrimaryKey;
    autoIncrement = pd.TableInfo.AutoIncrement;

    try
    {
        OpenSharedConnection();
        try
        {
            var names = new List<string>();
            var values = new List<string>();
            var index = 0;
            foreach (var i in pd.Columns)
            {
                // Don't insert result columns
                if (i.Value.ResultColumn)
                    continue;

                // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                {
                    primaryKeyType = i.Value.PropertyInfo.PropertyType;

                    // Setup auto increment expression
                    string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                    if (autoIncExpression != null)
                    {
                        names.Add(i.Key);
                        values.Add(autoIncExpression);
                    }
                    continue;
                }

                names.Add(_dbType.EscapeSqlIdentifier(i.Key));
                values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                columns.Add(i.Value);
            }

            string outputClause = String.Empty;
            if (autoIncrement)
            {
                outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
            }

            commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
                            _dbType.EscapeTableName(tableName),
                            string.Join(",", names.ToArray()),
                            outputClause
                            );

            sql = new Sql(commandText);
            parameters = new List<object>();
            string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
            bool isFirstPoco = true;

            foreach (object poco in pocos)
            {
                parameters.Clear();
                foreach (PocoColumn column in columns)
                {
                    parameters.Add(column.GetValue(poco));
                }

                sql.Append(valuesText, parameters.ToArray<object>());

                if (isFirstPoco)
                {
                    valuesText = "," + valuesText;
                    isFirstPoco = false;
                }
            }

            inserted = new List<object>();

            using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
            {
                if (!autoIncrement)
                {
                    DoPreExecute(cmd);
                    cmd.ExecuteNonQuery();
                    OnExecutedCommand(cmd);

                    PocoColumn pkColumn;
                    if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
                    {
                        foreach (object poco in pocos)
                        {
                            inserted.Add(pkColumn.GetValue(poco));
                        }
                    }

                    return inserted.ToArray<object>();
                }

                // BUG: the following line reportedly causes duplicate inserts; need to confirm
                //object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);

                using(var reader = cmd.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        inserted.Add(reader[0]);
                    }
                }

                object[] primaryKeys = inserted.ToArray<object>();

                // Assign the ID back to the primary key property
                if (primaryKeyName != null)
                {
                    PocoColumn pc;
                    if (pd.Columns.TryGetValue(primaryKeyName, out pc))
                    {
                        index = 0;
                        foreach(object poco in pocos)
                        {
                            pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
                            index++;
                        }
                    }
                }

                return primaryKeys;
            }
        }
        finally
        {
            CloseSharedConnection();
        }
    }
    catch (Exception x)
    {
        if (OnException(x))
            throw;
        return null;
    }
}
Anxiety answered 18/6, 2013 at 4:17 Comment(2)
Another answer pointed out a serious bug in the auto-increment path here that causes every record to be inserted twice - but kind of made a mess of everything in the process. All I needed to do was remove the following line: object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);. I'd go ahead and edit your post, but you should probably check to make sure that said line can actually be safely removed.Humphries
Hi @Aaronaught, thanks for the heads up. I've commented out that line until I can get access to a Windows box with VS to test. The bug doesn't surprise me, since the db that I created this code for would have been pretty forgiving about duplicate inserts.Anxiety
H
4

Here is the updated verision of Steve Jansen answer that splits in chuncs of maximum 2100 pacos

I commented out the following code as it produces duplicates in the database...

                //using (var reader = cmd.ExecuteReader())
                //{
                //    while (reader.Read())
                //    {
                //        inserted.Add(reader[0]);
                //    }
                //}

Updated Code

    /// <summary>
    /// Performs an SQL Insert against a collection of pocos
    /// </summary>
    /// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted.  Assumes that every POCO is of the same type.</param>
    /// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
    public object BulkInsert(IEnumerable<object> pocos)
    {
        Sql sql;
        IList<PocoColumn> columns = new List<PocoColumn>();
        IList<object> parameters;
        IList<object> inserted;
        PocoData pd;
        Type primaryKeyType;
        object template;
        string commandText;
        string tableName;
        string primaryKeyName;
        bool autoIncrement;

        int maxBulkInsert;

        if (null == pocos)
        {
            return new object[] { };
        }

        template = pocos.First<object>();

        if (null == template)
        {
            return null;
        }

        pd = PocoData.ForType(template.GetType());
        tableName = pd.TableInfo.TableName;
        primaryKeyName = pd.TableInfo.PrimaryKey;
        autoIncrement = pd.TableInfo.AutoIncrement;

        //Calculate the maximum chunk size
        maxBulkInsert = 2100 / pd.Columns.Count;
        IEnumerable<object> pacosToInsert = pocos.Take(maxBulkInsert);
        IEnumerable<object> pacosremaining = pocos.Skip(maxBulkInsert);

        try
        {
            OpenSharedConnection();
            try
            {
                var names = new List<string>();
                var values = new List<string>();
                var index = 0;

                foreach (var i in pd.Columns)
                {
                    // Don't insert result columns
                    if (i.Value.ResultColumn)
                        continue;

                    // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                    if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                    {
                        primaryKeyType = i.Value.PropertyInfo.PropertyType;

                        // Setup auto increment expression
                        string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                        if (autoIncExpression != null)
                        {
                            names.Add(i.Key);
                            values.Add(autoIncExpression);
                        }
                        continue;
                    }

                    names.Add(_dbType.EscapeSqlIdentifier(i.Key));
                    values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                    columns.Add(i.Value);
                }

                string outputClause = String.Empty;
                if (autoIncrement)
                {
                    outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
                }

                commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
                                _dbType.EscapeTableName(tableName),
                                string.Join(",", names.ToArray()),
                                outputClause
                                );

                sql = new Sql(commandText);
                parameters = new List<object>();
                string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
                bool isFirstPoco = true;
                var parameterCounter = 0;

                foreach (object poco in pacosToInsert)
                {
                    parameterCounter++;
                    parameters.Clear();

                    foreach (PocoColumn column in columns)
                    {
                        parameters.Add(column.GetValue(poco));
                    }

                    sql.Append(valuesText, parameters.ToArray<object>());

                    if (isFirstPoco && pocos.Count() > 1)
                    {
                        valuesText = "," + valuesText;
                        isFirstPoco = false;
                    }
                }

                inserted = new List<object>();

                using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
                {
                    if (!autoIncrement)
                    {
                        DoPreExecute(cmd);
                        cmd.ExecuteNonQuery();
                        OnExecutedCommand(cmd);

                        PocoColumn pkColumn;
                        if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
                        {
                            foreach (object poco in pocos)
                            {
                                inserted.Add(pkColumn.GetValue(poco));
                            }
                        }

                        return inserted.ToArray<object>();
                    }

                    object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);

                    if (pacosremaining.Any())
                    {
                        return BulkInsert(pacosremaining);
                    }

                    return id;

                    //using (var reader = cmd.ExecuteReader())
                    //{
                    //    while (reader.Read())
                    //    {
                    //        inserted.Add(reader[0]);
                    //    }
                    //}

                    //object[] primaryKeys = inserted.ToArray<object>();

                    //// Assign the ID back to the primary key property
                    //if (primaryKeyName != null)
                    //{
                    //    PocoColumn pc;
                    //    if (pd.Columns.TryGetValue(primaryKeyName, out pc))
                    //    {
                    //        index = 0;
                    //        foreach (object poco in pocos)
                    //        {
                    //            pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
                    //            index++;
                    //        }
                    //    }
                    //}

                    //return primaryKeys;
                }
            }
            finally
            {
                CloseSharedConnection();
            }
        }
        catch (Exception x)
        {
            if (OnException(x))
                throw;
            return null;
        }
    }
Hypochondrium answered 23/6, 2013 at 16:24 Comment(0)
A
3

Here is the code for BulkInsert that you can add to v5.01 PetaPoco.cs

You can paste it somewhere close the regular insert at line 1098

You give it an IEnumerable of Pocos and it will send it to the database

in batches of x together. The code is 90% from the regular insert.

I do not have performance comparison, let me know :)

    /// <summary>
    /// Bulk inserts multiple rows to SQL
    /// </summary>
    /// <param name="tableName">The name of the table to insert into</param>
    /// <param name="primaryKeyName">The name of the primary key column of the table</param>
    /// <param name="autoIncrement">True if the primary key is automatically allocated by the DB</param>
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>        
    public void BulkInsert(string tableName, string primaryKeyName, bool autoIncrement, IEnumerable<object> pocos, int batchSize = 25)
    {
        try
        {
            OpenSharedConnection();
            try
            {
                using (var cmd = CreateCommand(_sharedConnection, ""))
                {
                    var pd = PocoData.ForObject(pocos.First(), primaryKeyName);
                    // Create list of columnnames only once
                    var names = new List<string>();
                    foreach (var i in pd.Columns)
                    {
                        // Don't insert result columns
                        if (i.Value.ResultColumn)
                            continue;

                        // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                        if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                        {
                            // Setup auto increment expression
                            string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                            if (autoIncExpression != null)
                            {
                                names.Add(i.Key);
                            }
                            continue;
                        }
                        names.Add(_dbType.EscapeSqlIdentifier(i.Key));
                    }
                    var namesArray = names.ToArray();

                    var values = new List<string>();
                    int count = 0;
                    do
                    {
                        cmd.CommandText = "";
                        cmd.Parameters.Clear();
                        var index = 0;
                        foreach (var poco in pocos.Skip(count).Take(batchSize))
                        {
                            values.Clear();
                            foreach (var i in pd.Columns)
                            {
                                // Don't insert result columns
                                if (i.Value.ResultColumn) continue;

                                // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                                if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                                {
                                    // Setup auto increment expression
                                    string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                                    if (autoIncExpression != null)
                                    {
                                        values.Add(autoIncExpression);
                                    }
                                    continue;
                                }

                                values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                                AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
                            }

                            string outputClause = String.Empty;
                            if (autoIncrement)
                            {
                                outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
                            }

                            cmd.CommandText += string.Format("INSERT INTO {0} ({1}){2} VALUES ({3})", _dbType.EscapeTableName(tableName),
                                                             string.Join(",", namesArray), outputClause, string.Join(",", values.ToArray()));
                        }
                        // Are we done?
                        if (cmd.CommandText == "") break;
                        count += batchSize;
                        DoPreExecute(cmd);
                        cmd.ExecuteNonQuery();
                        OnExecutedCommand(cmd);
                    }
                    while (true);

                }
            }
            finally
            {
                CloseSharedConnection();
            }
        }
        catch (Exception x)
        {
            if (OnException(x))
                throw;
        }
    }


    /// <summary>
    /// Performs a SQL Bulk Insert
    /// </summary>
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>        
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>        
    public void BulkInsert(IEnumerable<object> pocos, int batchSize = 25)
    {
        if (!pocos.Any()) return;
        var pd = PocoData.ForType(pocos.First().GetType());
        BulkInsert(pd.TableInfo.TableName, pd.TableInfo.PrimaryKey, pd.TableInfo.AutoIncrement, pocos);
    }
Abecedary answered 20/3, 2013 at 16:36 Comment(0)
K
2

And in the same lines if you want BulkUpdate:

public void BulkUpdate<T>(string tableName, string primaryKeyName, IEnumerable<T> pocos, int batchSize = 25)
{
    try
    {
        object primaryKeyValue = null;

        OpenSharedConnection();
        try
        {
            using (var cmd = CreateCommand(_sharedConnection, ""))
            {
                var pd = PocoData.ForObject(pocos.First(), primaryKeyName);

                int count = 0;
                do
                {
                    cmd.CommandText = "";
                    cmd.Parameters.Clear();
                    var index = 0;

                    var cmdText = new StringBuilder();

                    foreach (var poco in pocos.Skip(count).Take(batchSize))
                    {
                        var sb = new StringBuilder();
                        var colIdx = 0;
                        foreach (var i in pd.Columns)
                        {
                            // Don't update the primary key, but grab the value if we don't have it
                            if (string.Compare(i.Key, primaryKeyName, true) == 0)
                            {
                                primaryKeyValue = i.Value.GetValue(poco);
                                continue;
                            }

                            // Dont update result only columns
                            if (i.Value.ResultColumn)
                                continue;

                            // Build the sql
                            if (colIdx > 0)
                                sb.Append(", ");
                            sb.AppendFormat("{0} = {1}{2}", _dbType.EscapeSqlIdentifier(i.Key), _paramPrefix,
                                            index++);

                            // Store the parameter in the command
                            AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
                            colIdx++;
                        }

                        // Find the property info for the primary key
                        PropertyInfo pkpi = null;
                        if (primaryKeyName != null)
                        {
                            pkpi = pd.Columns[primaryKeyName].PropertyInfo;
                        }


                        cmdText.Append(string.Format("UPDATE {0} SET {1} WHERE {2} = {3}{4};\n",
                                                     _dbType.EscapeTableName(tableName), sb.ToString(),
                                                     _dbType.EscapeSqlIdentifier(primaryKeyName), _paramPrefix,
                                                     index++));
                        AddParam(cmd, primaryKeyValue, pkpi);
                    }

                    if (cmdText.Length == 0) break;

                    if (_providerName.IndexOf("oracle", StringComparison.OrdinalIgnoreCase) >= 0)
                    {
                        cmdText.Insert(0, "BEGIN\n");
                        cmdText.Append("\n END;");
                    }

                    DoPreExecute(cmd);

                    cmd.CommandText = cmdText.ToString();
                    count += batchSize;
                    cmd.ExecuteNonQuery();
                    OnExecutedCommand(cmd);

                } while (true);
            }
        }
        finally
        {
            CloseSharedConnection();
        }
    }
    catch (Exception x)
    {
        if (OnException(x))
            throw;
    }
}
Kingdon answered 26/9, 2013 at 12:13 Comment(0)
S
2

Here's a nice 2018 update using FastMember from NuGet:

    private static async Task SqlBulkCopyPocoAsync<T>(PetaPoco.Database db, IEnumerable<T> data)
    {
        var pd = PocoData.ForType(typeof(T), db.DefaultMapper);
        using (var bcp = new SqlBulkCopy(db.ConnectionString))
        using (var reader = ObjectReader.Create(data)) 
        {
            // set up a mapping from the property names to the column names
            var propNames = typeof(T).GetProperties().Where(p => Attribute.IsDefined(p, typeof(ResultColumnAttribute)) == false).Select(propertyInfo => propertyInfo.Name).ToArray();
            foreach (var propName in propNames)
            {
                bcp.ColumnMappings.Add(propName, "[" + pd.GetColumnName(propName) + "]");
            }
            bcp.DestinationTableName = pd.TableInfo.TableName;
            await bcp.WriteToServerAsync(reader).ConfigureAwait(false);
        }
    }
Strader answered 23/3, 2018 at 19:31 Comment(0)
P
-10

You can just do a foreach on your records.

foreach (var record in records) {
    db.Save(record);
}
Parlay answered 6/7, 2011 at 11:55 Comment(2)
Does that only create one database hit?Scratchy
No it hits the database once per record. How do you expect to do it in one database hit? Unless you just want to generate an update statement and execute it.Parlay

© 2022 - 2024 — McMap. All rights reserved.