使用Petapoco进行批量插入/更新

21

我正在使用Save()方法插入或更新记录,但我想让它在仅一次数据库操作中执行批量插入和批量更新。我该如何做到这一点?

9个回答

13

在我的情况下,我利用了 database.Execute() 方法。

我创建了一个 SQL 参数,其中包含了我的插入语句的前半部分:

var sql = new Sql("insert into myTable(Name, Age, Gender) values");

for (int i = 0; i < pocos.Count ; ++i)
{
   var p = pocos[i];
   sql.Append("(@0, @1, @2)", p.Name, p.Age , p.Gender);
   if(i != pocos.Count -1)
     sql.Append(",");
}

Database.Execute(sql);

3
如果列表中有很多项,PetaPoco的"Sql.Append()"方法会产生StackOverflow错误。 - Zelid
@Zelid "很多"是多少? - Brondahl

12

我尝试了两种不同的方法来比默认插入更快地插入大量行(当您有很多行时,默认插入速度相当慢)。

1) 首先创建一个包含 poco 的列表,然后在循环中一次性插入它们(并在事务中插入):

using (var tr = PetaPocoDb.GetTransaction())
{
    foreach (var record in listOfRecords)
    {
        PetaPocoDb.Insert(record);
    }
    tr.Complete();
}

2) SqlBulkCopy a DataTable:

var bulkCopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.TableLock);
bulkCopy.DestinationTableName = "SomeTable";
bulkCopy.WriteToServer(dt);

为了将我的List<T>转换为DataTable,我使用了Marc Gravells的Convert generic List/Enumerable to DataTable?函数。该函数对我来说非常好用(在我重新排列了Poco属性以按照数据库表字段的相同顺序排列后)。
在我进行了大约1000行的(快速)性能测试后,SqlBulkCopy是最快的方法,比事务方法快50%左右。
希望这对你有所帮助。

我认为你的第一种方法仍然会为每个插入操作访问数据库。 - IvoTops
是的,将与组合插入tsql的速度进行比较将会很有趣。在我的情况下,当我注意到自己只比bulkcopy慢50%时,我停止了进一步的性能研究。 - joeriks

7

在SQL查询中,一次性插入速度更快

下面是PetaPoco.Database类的客户端方法,它可以添加任何集合的批量插入功能:

public void BulkInsertRecords<T>(IEnumerable<T> collection)
        {
            try
            {
                OpenSharedConnection();
                using (var cmd = CreateCommand(_sharedConnection, ""))
                {
                    var pd = Database.PocoData.ForType(typeof(T));
                    var tableName = EscapeTableName(pd.TableInfo.TableName);
                    string cols = string.Join(", ", (from c in pd.QueryColumns select tableName + "." + EscapeSqlIdentifier(c)).ToArray());
                    var pocoValues = new List<string>();
                    var index = 0;
                    foreach (var poco in collection)
                    {
                        var values = new List<string>();
                        foreach (var i in pd.Columns)
                        {
                            values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                            AddParam(cmd, i.Value.GetValue(poco), _paramPrefix);
                        }
                        pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")");
                    }
                    var sql = string.Format("INSERT INTO {0} ({1}) VALUES {2}", tableName, cols, string.Join(", ", pocoValues));
                    cmd.CommandText = sql;
                    cmd.ExecuteNonQuery();
                }
            }
            finally
            {
                CloseSharedConnection();
            }
        }

非常好,与我描述的事务方法进行性能比较会很有趣(我相信你的方法更快,但是快多少呢?)另外,据我所知,如果将插入操作包裹在一个事务中,应该可以获得额外的性能提升(http://blog.staticvoid.co.nz/2012/4/26/making_dapper_faster_with_transactions)。 - joeriks

4

这是更新版本的Steve Jansen答案,它将内容分成最大2100 pacos的块。

我注释掉了以下代码,因为它会在数据库中产生重复的数据...

                //using (var reader = cmd.ExecuteReader())
                //{
                //    while (reader.Read())
                //    {
                //        inserted.Add(reader[0]);
                //    }
                //}

更新的代码

    /// <summary>
    /// Performs an SQL Insert against a collection of pocos
    /// </summary>
    /// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted.  Assumes that every POCO is of the same type.</param>
    /// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
    public object BulkInsert(IEnumerable<object> pocos)
    {
        Sql sql;
        IList<PocoColumn> columns = new List<PocoColumn>();
        IList<object> parameters;
        IList<object> inserted;
        PocoData pd;
        Type primaryKeyType;
        object template;
        string commandText;
        string tableName;
        string primaryKeyName;
        bool autoIncrement;

        int maxBulkInsert;

        if (null == pocos)
        {
            return new object[] { };
        }

        template = pocos.First<object>();

        if (null == template)
        {
            return null;
        }

        pd = PocoData.ForType(template.GetType());
        tableName = pd.TableInfo.TableName;
        primaryKeyName = pd.TableInfo.PrimaryKey;
        autoIncrement = pd.TableInfo.AutoIncrement;

        //Calculate the maximum chunk size
        maxBulkInsert = 2100 / pd.Columns.Count;
        IEnumerable<object> pacosToInsert = pocos.Take(maxBulkInsert);
        IEnumerable<object> pacosremaining = pocos.Skip(maxBulkInsert);

        try
        {
            OpenSharedConnection();
            try
            {
                var names = new List<string>();
                var values = new List<string>();
                var index = 0;

                foreach (var i in pd.Columns)
                {
                    // Don't insert result columns
                    if (i.Value.ResultColumn)
                        continue;

                    // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                    if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                    {
                        primaryKeyType = i.Value.PropertyInfo.PropertyType;

                        // Setup auto increment expression
                        string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                        if (autoIncExpression != null)
                        {
                            names.Add(i.Key);
                            values.Add(autoIncExpression);
                        }
                        continue;
                    }

                    names.Add(_dbType.EscapeSqlIdentifier(i.Key));
                    values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                    columns.Add(i.Value);
                }

                string outputClause = String.Empty;
                if (autoIncrement)
                {
                    outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
                }

                commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
                                _dbType.EscapeTableName(tableName),
                                string.Join(",", names.ToArray()),
                                outputClause
                                );

                sql = new Sql(commandText);
                parameters = new List<object>();
                string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
                bool isFirstPoco = true;
                var parameterCounter = 0;

                foreach (object poco in pacosToInsert)
                {
                    parameterCounter++;
                    parameters.Clear();

                    foreach (PocoColumn column in columns)
                    {
                        parameters.Add(column.GetValue(poco));
                    }

                    sql.Append(valuesText, parameters.ToArray<object>());

                    if (isFirstPoco && pocos.Count() > 1)
                    {
                        valuesText = "," + valuesText;
                        isFirstPoco = false;
                    }
                }

                inserted = new List<object>();

                using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
                {
                    if (!autoIncrement)
                    {
                        DoPreExecute(cmd);
                        cmd.ExecuteNonQuery();
                        OnExecutedCommand(cmd);

                        PocoColumn pkColumn;
                        if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
                        {
                            foreach (object poco in pocos)
                            {
                                inserted.Add(pkColumn.GetValue(poco));
                            }
                        }

                        return inserted.ToArray<object>();
                    }

                    object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);

                    if (pacosremaining.Any())
                    {
                        return BulkInsert(pacosremaining);
                    }

                    return id;

                    //using (var reader = cmd.ExecuteReader())
                    //{
                    //    while (reader.Read())
                    //    {
                    //        inserted.Add(reader[0]);
                    //    }
                    //}

                    //object[] primaryKeys = inserted.ToArray<object>();

                    //// Assign the ID back to the primary key property
                    //if (primaryKeyName != null)
                    //{
                    //    PocoColumn pc;
                    //    if (pd.Columns.TryGetValue(primaryKeyName, out pc))
                    //    {
                    //        index = 0;
                    //        foreach (object poco in pocos)
                    //        {
                    //            pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
                    //            index++;
                    //        }
                    //    }
                    //}

                    //return primaryKeys;
                }
            }
            finally
            {
                CloseSharedConnection();
            }
        }
        catch (Exception x)
        {
            if (OnException(x))
                throw;
            return null;
        }
    }

4
以下是PetaPoco的BulkInsert方法,它扩展了taylonr的非常聪明的想法,使用SQL技术通过"INSERT INTO tab(col1, col2) OUTPUT inserted.[ID] VALUES (@0, @1), (@2, 3), (@4, @5), ..., (@n-1, @n)"来插入多行。

它还返回插入记录的自动增量(identity)值,这在IvoTops的实现中似乎没有发生。

注意:SQL Server 2012(及以下版本)每个查询有2,100个参数限制。 (这可能是Zelid评论引用的堆栈溢出异常的源头)。 您需要根据未标记为IgnoreResult的列数手动拆分批处理。 例如,具有21个列的POCO应该按99的批处理大小发送,或者(2100 - 1) / 21。 我可能会重构这个方法,以根据此限制动态拆分批处理,但是始终通过管理批处理大小使外部的这个方法获得最佳结果。

与我以前使用单个事务中的共享连接相比,此方法显示了大约50%的执行时间提高。

这是Massive真正闪耀的地方- Massive有一个Save(params object[] things)方法,它构建一个IDbCommands数组,并在共享连接上执行每个命令。 它开箱即用,不会遇到参数限制问题。

/// <summary>
/// Performs an SQL Insert against a collection of pocos
/// </summary>
/// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted.  Assumes that every POCO is of the same type.</param>
/// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
/// <remarks>
///     NOTE: As of SQL Server 2012, there is a limit of 2100 parameters per query.  This limitation does not seem to apply on other platforms, so 
///           this method will allow more than 2100 parameters.  See http://msdn.microsoft.com/en-us/library/ms143432.aspx
///     The name of the table, it's primary key and whether it's an auto-allocated primary key are retrieved from the attributes of the first POCO in the collection
/// </remarks>
public object[] BulkInsert(IEnumerable<object> pocos)
{
    Sql sql;
    IList<PocoColumn> columns = new List<PocoColumn>();
    IList<object> parameters;
    IList<object> inserted;
    PocoData pd;
    Type primaryKeyType;
    object template;
    string commandText;
    string tableName;
    string primaryKeyName;
    bool autoIncrement;


    if (null == pocos)
        return new object[] {};

    template = pocos.First<object>();

    if (null == template)
        return null;

    pd = PocoData.ForType(template.GetType());
    tableName = pd.TableInfo.TableName;
    primaryKeyName = pd.TableInfo.PrimaryKey;
    autoIncrement = pd.TableInfo.AutoIncrement;

    try
    {
        OpenSharedConnection();
        try
        {
            var names = new List<string>();
            var values = new List<string>();
            var index = 0;
            foreach (var i in pd.Columns)
            {
                // Don't insert result columns
                if (i.Value.ResultColumn)
                    continue;

                // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                {
                    primaryKeyType = i.Value.PropertyInfo.PropertyType;

                    // Setup auto increment expression
                    string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                    if (autoIncExpression != null)
                    {
                        names.Add(i.Key);
                        values.Add(autoIncExpression);
                    }
                    continue;
                }

                names.Add(_dbType.EscapeSqlIdentifier(i.Key));
                values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                columns.Add(i.Value);
            }

            string outputClause = String.Empty;
            if (autoIncrement)
            {
                outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
            }

            commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
                            _dbType.EscapeTableName(tableName),
                            string.Join(",", names.ToArray()),
                            outputClause
                            );

            sql = new Sql(commandText);
            parameters = new List<object>();
            string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
            bool isFirstPoco = true;

            foreach (object poco in pocos)
            {
                parameters.Clear();
                foreach (PocoColumn column in columns)
                {
                    parameters.Add(column.GetValue(poco));
                }

                sql.Append(valuesText, parameters.ToArray<object>());

                if (isFirstPoco)
                {
                    valuesText = "," + valuesText;
                    isFirstPoco = false;
                }
            }

            inserted = new List<object>();

            using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
            {
                if (!autoIncrement)
                {
                    DoPreExecute(cmd);
                    cmd.ExecuteNonQuery();
                    OnExecutedCommand(cmd);

                    PocoColumn pkColumn;
                    if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
                    {
                        foreach (object poco in pocos)
                        {
                            inserted.Add(pkColumn.GetValue(poco));
                        }
                    }

                    return inserted.ToArray<object>();
                }

                // BUG: the following line reportedly causes duplicate inserts; need to confirm
                //object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);

                using(var reader = cmd.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        inserted.Add(reader[0]);
                    }
                }

                object[] primaryKeys = inserted.ToArray<object>();

                // Assign the ID back to the primary key property
                if (primaryKeyName != null)
                {
                    PocoColumn pc;
                    if (pd.Columns.TryGetValue(primaryKeyName, out pc))
                    {
                        index = 0;
                        foreach(object poco in pocos)
                        {
                            pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
                            index++;
                        }
                    }
                }

                return primaryKeys;
            }
        }
        finally
        {
            CloseSharedConnection();
        }
    }
    catch (Exception x)
    {
        if (OnException(x))
            throw;
        return null;
    }
}

另一个答案指出了这里自动递增路径中的一个严重错误,导致每个记录都被插入两次 - 但在此过程中有点把一切搞糟了。我所需要做的就是删除以下行:object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);。我可以编辑您的帖子,但您应该先检查一下是否可以安全地删除该行。 - Aaronaught
嗨@Aaronaught,感谢提醒。在我能够获得带有VS的Windows框后进行测试之前,我已经注释掉了那行代码。这个bug并不让我惊讶,因为我为这段代码创建的数据库对于重复插入是相当宽容的。 - Steve Jansen

3
这里是批量插入的代码,您可以将其添加到v5.01 PetaPoco.cs中。您可以将其粘贴在常规插入的1098行附近。您只需给它一个Pocos的可枚举对象,它就会将其一起分批发送到数据库中。该代码有90%与常规插入相同。我没有性能比较数据,如果需要请告诉我 :)
    /// <summary>
    /// Bulk inserts multiple rows to SQL
    /// </summary>
    /// <param name="tableName">The name of the table to insert into</param>
    /// <param name="primaryKeyName">The name of the primary key column of the table</param>
    /// <param name="autoIncrement">True if the primary key is automatically allocated by the DB</param>
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>        
    public void BulkInsert(string tableName, string primaryKeyName, bool autoIncrement, IEnumerable<object> pocos, int batchSize = 25)
    {
        try
        {
            OpenSharedConnection();
            try
            {
                using (var cmd = CreateCommand(_sharedConnection, ""))
                {
                    var pd = PocoData.ForObject(pocos.First(), primaryKeyName);
                    // Create list of columnnames only once
                    var names = new List<string>();
                    foreach (var i in pd.Columns)
                    {
                        // Don't insert result columns
                        if (i.Value.ResultColumn)
                            continue;

                        // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                        if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                        {
                            // Setup auto increment expression
                            string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                            if (autoIncExpression != null)
                            {
                                names.Add(i.Key);
                            }
                            continue;
                        }
                        names.Add(_dbType.EscapeSqlIdentifier(i.Key));
                    }
                    var namesArray = names.ToArray();

                    var values = new List<string>();
                    int count = 0;
                    do
                    {
                        cmd.CommandText = "";
                        cmd.Parameters.Clear();
                        var index = 0;
                        foreach (var poco in pocos.Skip(count).Take(batchSize))
                        {
                            values.Clear();
                            foreach (var i in pd.Columns)
                            {
                                // Don't insert result columns
                                if (i.Value.ResultColumn) continue;

                                // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                                if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                                {
                                    // Setup auto increment expression
                                    string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                                    if (autoIncExpression != null)
                                    {
                                        values.Add(autoIncExpression);
                                    }
                                    continue;
                                }

                                values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                                AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
                            }

                            string outputClause = String.Empty;
                            if (autoIncrement)
                            {
                                outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
                            }

                            cmd.CommandText += string.Format("INSERT INTO {0} ({1}){2} VALUES ({3})", _dbType.EscapeTableName(tableName),
                                                             string.Join(",", namesArray), outputClause, string.Join(",", values.ToArray()));
                        }
                        // Are we done?
                        if (cmd.CommandText == "") break;
                        count += batchSize;
                        DoPreExecute(cmd);
                        cmd.ExecuteNonQuery();
                        OnExecutedCommand(cmd);
                    }
                    while (true);

                }
            }
            finally
            {
                CloseSharedConnection();
            }
        }
        catch (Exception x)
        {
            if (OnException(x))
                throw;
        }
    }


    /// <summary>
    /// Performs a SQL Bulk Insert
    /// </summary>
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>        
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>        
    public void BulkInsert(IEnumerable<object> pocos, int batchSize = 25)
    {
        if (!pocos.Any()) return;
        var pd = PocoData.ForType(pocos.First().GetType());
        BulkInsert(pd.TableInfo.TableName, pd.TableInfo.PrimaryKey, pd.TableInfo.AutoIncrement, pocos);
    }

2

以下是一个很好的2018年更新,使用来自NuGet的FastMember

    private static async Task SqlBulkCopyPocoAsync<T>(PetaPoco.Database db, IEnumerable<T> data)
    {
        var pd = PocoData.ForType(typeof(T), db.DefaultMapper);
        using (var bcp = new SqlBulkCopy(db.ConnectionString))
        using (var reader = ObjectReader.Create(data)) 
        {
            // set up a mapping from the property names to the column names
            var propNames = typeof(T).GetProperties().Where(p => Attribute.IsDefined(p, typeof(ResultColumnAttribute)) == false).Select(propertyInfo => propertyInfo.Name).ToArray();
            foreach (var propName in propNames)
            {
                bcp.ColumnMappings.Add(propName, "[" + pd.GetColumnName(propName) + "]");
            }
            bcp.DestinationTableName = pd.TableInfo.TableName;
            await bcp.WriteToServerAsync(reader).ConfigureAwait(false);
        }
    }

2

如果你想要进行大量更新操作,可以按照以下步骤进行:

public void BulkUpdate<T>(string tableName, string primaryKeyName, IEnumerable<T> pocos, int batchSize = 25)
{
    try
    {
        object primaryKeyValue = null;

        OpenSharedConnection();
        try
        {
            using (var cmd = CreateCommand(_sharedConnection, ""))
            {
                var pd = PocoData.ForObject(pocos.First(), primaryKeyName);

                int count = 0;
                do
                {
                    cmd.CommandText = "";
                    cmd.Parameters.Clear();
                    var index = 0;

                    var cmdText = new StringBuilder();

                    foreach (var poco in pocos.Skip(count).Take(batchSize))
                    {
                        var sb = new StringBuilder();
                        var colIdx = 0;
                        foreach (var i in pd.Columns)
                        {
                            // Don't update the primary key, but grab the value if we don't have it
                            if (string.Compare(i.Key, primaryKeyName, true) == 0)
                            {
                                primaryKeyValue = i.Value.GetValue(poco);
                                continue;
                            }

                            // Dont update result only columns
                            if (i.Value.ResultColumn)
                                continue;

                            // Build the sql
                            if (colIdx > 0)
                                sb.Append(", ");
                            sb.AppendFormat("{0} = {1}{2}", _dbType.EscapeSqlIdentifier(i.Key), _paramPrefix,
                                            index++);

                            // Store the parameter in the command
                            AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
                            colIdx++;
                        }

                        // Find the property info for the primary key
                        PropertyInfo pkpi = null;
                        if (primaryKeyName != null)
                        {
                            pkpi = pd.Columns[primaryKeyName].PropertyInfo;
                        }


                        cmdText.Append(string.Format("UPDATE {0} SET {1} WHERE {2} = {3}{4};\n",
                                                     _dbType.EscapeTableName(tableName), sb.ToString(),
                                                     _dbType.EscapeSqlIdentifier(primaryKeyName), _paramPrefix,
                                                     index++));
                        AddParam(cmd, primaryKeyValue, pkpi);
                    }

                    if (cmdText.Length == 0) break;

                    if (_providerName.IndexOf("oracle", StringComparison.OrdinalIgnoreCase) >= 0)
                    {
                        cmdText.Insert(0, "BEGIN\n");
                        cmdText.Append("\n END;");
                    }

                    DoPreExecute(cmd);

                    cmd.CommandText = cmdText.ToString();
                    count += batchSize;
                    cmd.ExecuteNonQuery();
                    OnExecutedCommand(cmd);

                } while (true);
            }
        }
        finally
        {
            CloseSharedConnection();
        }
    }
    catch (Exception x)
    {
        if (OnException(x))
            throw;
    }
}

-10
你可以在记录上使用 foreach 循环。
foreach (var record in records) {
    db.Save(record);
}

1
这是否只会创建一次数据库请求? - Dan
不,它每条记录只会访问一次数据库。你希望如何在一次数据库访问中完成呢?除非你只想生成一个更新语句并执行它。 - Schotime

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接