MySQL数据库中插入行的最高效方式。

55

我看了很多关于此问题的问题,但没有找到一个足够快的答案。我认为有更好的方法将大量行插入到MySQL数据库中。

我使用以下代码将100k插入我的MySQL数据库:

public static void CSVToMySQL()
{
    string ConnectionString = "server=192.168.1xxx";
    string Command = "INSERT INTO User (FirstName, LastName ) VALUES (@FirstName, @LastName);";
    using (MySqlConnection mConnection = new MySqlConnection(ConnectionString))
    {
        mConnection.Open();

        for(int i =0;i< 100000;i++) //inserting 100k items
        using (MySqlCommand myCmd = new MySqlCommand(Command, mConnection))
        {
            myCmd.CommandType = CommandType.Text;
            myCmd.Parameters.AddWithValue("@FirstName", "test");
            myCmd.Parameters.AddWithValue("@LastName", "test");
            myCmd.ExecuteNonQuery();
        }
    }
}

处理10万行数据大约需要40秒。如何使其更快或更高效?

也许通过DataTable/DataAdapter插入多行或一次性插入会更快:

INSERT INTO User (Fn, Ln) VALUES (@Fn1, @Ln1), (@Fn2, @Ln2)...

由于安全问题,我无法将数据加载到文件中并使用MySQLBulkLoad进行加载。


Fubo 正在正确的轨道上,但它需要被“参数化”。然后你可以调整批量大小以获得最佳性能。这些批量大小将根据 CPU/RAM 等而有所不同。 - Simon
11个回答

76

这是我的“多重插入”代码。

插入10万行仅花费了3秒钟,而不是40秒!

public static void BulkToMySQL()
{
    string ConnectionString = "server=192.168.1xxx";
    StringBuilder sCommand = new StringBuilder("INSERT INTO User (FirstName, LastName) VALUES ");           
    using (MySqlConnection mConnection = new MySqlConnection(ConnectionString))
    {
        List<string> Rows = new List<string>();
        for (int i = 0; i < 100000; i++)
        {
            Rows.Add(string.Format("('{0}','{1}')", MySqlHelper.EscapeString("test"), MySqlHelper.EscapeString("test")));
        }
        sCommand.Append(string.Join(",", Rows));
        sCommand.Append(";");
        mConnection.Open();
        using (MySqlCommand myCmd = new MySqlCommand(sCommand.ToString(), mConnection))
        {
            myCmd.CommandType = CommandType.Text;
            myCmd.ExecuteNonQuery();
        }
    }
}

生成的SQL语句看起来像这样:

INSERT INTO User (FirstName, LastName) VALUES ('test','test'),('test','test'),... ;

更新:感谢 Salman A,我添加了MySQLHelper.EscapeString以避免代码注入,该函数在使用参数时内部使用。


只需在“test”中的撇号上执行替换操作,将其替换为双撇号,就可以了。"test" ==> "test".Replace("'", "''") - Stefan Steiger
@fubo 应该是这样的。我猜它使用了相同或类似的代码来转义参数值。 - Salman A
2
现在你在内存中创建了一个相当长的字符串。 - ed22
高效,但如果您需要插入项的ID,则可能无法正常工作。 - Tronald
我认为这段代码可能会受到SQL注入攻击?是吗? - Akshay
显示剩余4条评论

19

使用MySqlDataAdapter、transactions和UpdateBatchSize进行了一次小测试。它比您的第一个示例快约30倍。 Mysql运行在单独的框上,因此涉及延迟。批处理大小可能需要进行一些调整。代码如下:

string ConnectionString = "server=xxx;Uid=xxx;Pwd=xxx;Database=xxx";

string Command = "INSERT INTO User2 (FirstName, LastName ) VALUES (@FirstName, @LastName);";


 using (var mConnection = new MySqlConnection(ConnectionString))
     {
         mConnection.Open();
         MySqlTransaction transaction = mConnection.BeginTransaction();

        //Obtain a dataset, obviously a "select *" is not the best way...
        var mySqlDataAdapterSelect = new MySqlDataAdapter("select * from User2", mConnection);

        var ds = new DataSet();

        mySqlDataAdapterSelect.Fill(ds, "User2");


        var mySqlDataAdapter = new MySqlDataAdapter();

        mySqlDataAdapter.InsertCommand = new MySqlCommand(Command, mConnection);


        mySqlDataAdapter.InsertCommand.Parameters.Add("@FirstName", MySqlDbType.VarChar, 32, "FirstName");
        mySqlDataAdapter.InsertCommand.Parameters.Add("@LastName", MySqlDbType.VarChar, 32, "LastName");
        mySqlDataAdapter.InsertCommand.UpdatedRowSource = UpdateRowSource.None;

        var stopwatch = new Stopwatch();
        stopwatch.Start();

        for (int i = 0; i < 50000; i++)
        {
            DataRow row = ds.Tables["User2"].NewRow();
            row["FirstName"] = "1234";
            row["LastName"] = "1234";
            ds.Tables["User2"].Rows.Add(row);
        }

         mySqlDataAdapter.UpdateBatchSize = 100;
         mySqlDataAdapter.Update(ds, "User2");

         transaction.Commit();

         stopwatch.Stop();
         Debug.WriteLine(" inserts took " + stopwatch.ElapsedMilliseconds + "ms");
    }
}

只需要13秒钟 - 取决于UpdateBatchSize。而且似乎需要MySqlTransaction。 - fubo
好的,所以你只快了3倍,太糟糕了。 - Konstantin
1
你可以使用 mySqlDataAdapterSelect.FillSchema(dt, SchemaType.Mapped); 从数据库获取一个空结构。 - fubo
好的,很棒。我刚为您完成了这个示例,我很好奇它是否可行。我通常使用NHibernate或类似工具 :) - Konstantin
为了使这个方法更加通用,可以使用以下代码:myAdapter.InsertCommand= new MySqlCommandBuilder(myAdapter).GetInsertCommand() - fubo
显示剩余3条评论

13

在一个事务中执行命令,并在每个迭代中重用相同的命令实例。为了进一步优化性能,可以在一个命令中发送100个查询。进行并行执行可能会获得更好的性能(Parallel.For),但要确保每个并行循环都有自己的MySqlCommand实例。

public static void CSVToMySQL()
{
    string ConnectionString = "server=192.168.1xxx";
    string Command = "INSERT INTO User (FirstName, LastName ) VALUES (@FirstName, @LastName);";
    using (MySqlConnection mConnection = new MySqlConnection(ConnectionString)) 
    {
        mConnection.Open();
        using (MySqlTransaction trans = mConnection.BeginTransaction()) 
        {
            using (MySqlCommand myCmd = new MySqlCommand(Command, mConnection, trans)) 
            {
                myCmd.CommandType = CommandType.Text;
                for (int i = 0; i <= 99999; i++) 
                {
                    //inserting 100k items
                    myCmd.Parameters.Clear();
                    myCmd.Parameters.AddWithValue("@FirstName", "test");
                    myCmd.Parameters.AddWithValue("@LastName", "test");
                    myCmd.ExecuteNonQuery();
                }
                trans.Commit();
            }
        }
    }
}

谢谢你的回答。那花了44秒 :( 看起来 MySqlTransaction 是由SQL-Server处理的,每行都是单独发送的。 - fubo
1
只是指出,这种方法是阻塞的,将方法更改为异步和ExecuteNonQuery更改为ExecuteNonQueryAsync,以及Open更改为OpenAsync并不能防止阻塞。 - Kraang Prime
得到100分赏金的答案中的方法比这个方法要更高效。 - SexyBeast

6
这种方法可能不比使用StringBuilder更快,但它是可参数化的:
/// <summary>
    /// Bulk insert some data, uses parameters
    /// </summary>
    /// <param name="table">The Table Name</param>
    /// <param name="inserts">Holds list of data to insert</param>
    /// <param name="batchSize">executes the insert after batch lines</param>
    /// <param name="progress">Progress reporting</param>
    public void BulkInsert(string table, MySQLBulkInsertData inserts, int batchSize = 100, IProgress<double> progress = null)
    {
        if (inserts.Count <= 0) throw new ArgumentException("Nothing to Insert");

        string insertcmd = string.Format("INSERT INTO `{0}` ({1}) VALUES ", table,
                                         inserts.Fields.Select(p => p.FieldName).ToCSV());
        StringBuilder sb = new StringBuilder(); 
        using (MySqlConnection conn = new MySqlConnection(ConnectionString))
        using (MySqlCommand sqlExecCommand = conn.CreateCommand())
        {
            conn.Open();
            sb.AppendLine(insertcmd);
            for (int i = 0; i < inserts.Count; i++)
            {
                sb.AppendLine(ToParameterCSV(inserts.Fields, i));
                for (int j = 0; j < inserts[i].Count(); j++)
                {
                    sqlExecCommand.Parameters.AddWithValue(string.Format("{0}{1}",inserts.Fields[j].FieldName,i), inserts[i][j]);
                }
                //commit if we are on the batch sizeor the last item
                if (i > 0 && (i%batchSize == 0 || i == inserts.Count - 1))
                {
                    sb.Append(";");
                    sqlExecCommand.CommandText = sb.ToString();
                    sqlExecCommand.ExecuteNonQuery();
                    //reset the stringBuilder
                    sb.Clear();
                    sb.AppendLine(insertcmd);
                    if (progress != null)
                    {
                        progress.Report((double)i/inserts.Count);
                    }
                }
                else
                {
                    sb.Append(",");
                }
            }
        }
    }

以下是使用助手类的示例:

/// <summary>
/// Helper class to builk insert data into a table
/// </summary>
public struct MySQLFieldDefinition
{
    public MySQLFieldDefinition(string field, MySqlDbType type) : this()
    {
        FieldName = field;
        ParameterType = type;
    }

    public string FieldName { get; private set; }
    public MySqlDbType ParameterType { get; private set; }
}

///
///You need to ensure the fieldnames are in the same order as the object[] array
///
public class MySQLBulkInsertData : List<object[]>
{
    public MySQLBulkInsertData(params MySQLFieldDefinition[] fieldnames)
    {
        Fields = fieldnames;
    }

    public MySQLFieldDefinition[] Fields { get; private set; }
}

这是一个辅助方法:

    /// <summary>
    /// Return a CSV string of the values in the list
    /// </summary>
    /// <returns></returns>
    /// <exception cref="ArgumentNullException"></exception>
    private string ToParameterCSV(IEnumerable<MySQLFieldDefinition> p, int row)
    {
        string csv = p.Aggregate(string.Empty,
            (current, i) => string.IsNullOrEmpty(current)
                    ? string.Format("@{0}{1}",i.FieldName, row)
                    : string.Format("{0},@{2}{1}", current, row, i.FieldName));
        return string.Format("({0})", csv);
    }

也许不是非常优雅,但它可以很好地工作。我需要进度跟踪,因此这一功能被包括在其中,如果需要,可以随意删除该部分。
这将产生类似于您所需输出的SQL命令。
编辑:转CSV:
        /// <summary>
    /// Return a CSV string of the values in the list
    /// </summary>
    /// <param name="intValues"></param>
    /// <param name="separator"></param>
    /// <param name="encloser"></param>
    /// <returns></returns>
    /// <exception cref="ArgumentNullException"></exception>
    public static string ToCSV<T>(this IEnumerable<T> intValues, string separator = ",", string encloser = "")
    {
        string result = String.Empty;
        foreach (T value in intValues)
        {
            result = String.IsNullOrEmpty(result)
                ? string.Format("{1}{0}{1}", value, encloser)
                : String.Format("{0}{1}{3}{2}{3}", result, separator, value, encloser);
        }
        return result;
    }

缺少扩展名为 ToCSV 的文件。 - fubo

6
如果 AddWithValueAdd 方法没有转义字符串,您必须事先处理以避免 SQL 注入和语法错误。
每次仅构建包含 1000 行的 INSERT 语句。这样比起一行一行地添加(1 行 per INSERT),应该可以快速运行 10 倍。一次性添加所有 100K 行是有风险的,可能更慢。有风险是因为您可能会超出某些限制(数据包大小等);更慢是因为需要一个巨大的 ROLLBACK 日志。在每个批次之后进行 COMMIT 或使用 autocommit=1

请使用不超过 max_allowed_packet 字节的插入语句进行添加和构建。这是在处理大型查询时最容易出现问题的限制。 - Salman A
使用字符串拼接或参数的语句?你能给我展示一个例子吗? - fubo

5

加速的一种方法是将所有插入操作包装到一个事务中(SQL-Server 代码):

using (SqlConnection connection = new SqlConnection(CloudConfigurationManager.GetSetting("Sql.ConnectionString")))
{
    conn.Open();
    SqlTransaction transaction = conn.BeginTransaction();

    try 
    {  
        foreach (string commandString in dbOperations)
        {
            SqlCommand cmd = new SqlCommand(commandString, conn, transaction);
            cmd.ExecuteNonQuery();
        }
        transaction.Commit(); 
    } // Here the execution is committed to the DB
    catch (Exception)
    {
      transaction.Rollback();
      throw;
    }
    conn.Close();
}

另一种方法是将CSV文件加载到数据表中,并使用DataAdapter的批处理功能。
 DataTable dtInsertRows = GetDataTable(); 

    SqlConnection connection = new SqlConnection(connectionString);
    SqlCommand command = new SqlCommand("sp_BatchInsert", connection);
    command.CommandType = CommandType.StoredProcedure;
    command.UpdatedRowSource = UpdateRowSource.None;

    // Set the Parameter with appropriate Source Column Name
    command.Parameters.Add("@PersonId", SqlDbType.Int, 4, dtInsertRows.Columns[0].ColumnName);   
    command.Parameters.Add("@PersonName", SqlDbType.VarChar, 100, dtInsertRows.Columns[1].ColumnName);

    SqlDataAdapter adpt = new SqlDataAdapter();
    adpt.InsertCommand = command;
    // Specify the number of records to be Inserted/Updated in one go. Default is 1.
    adpt.UpdateBatchSize = 2;

    connection.Open();
    int recordsInserted = adpt.Update(dtInsertRows);   
    connection.Close();

您可以在这里找到一个很好的示例。

或者您可以使用MySQL BulkLoader C#类:

var bl = new MySqlBulkLoader(connection);
bl.TableName = "mytable";
bl.FieldTerminator = ",";
bl.LineTerminator = "\r\n";
bl.FileName = "myfileformytable.csv";
bl.NumberOfLinesToSkip = 1;
var inserted = bl.Load();
Debug.Print(inserted + " rows inserted.");

如果你在一个命令中执行多个插入操作,使用StringBuilder代替string可能会节省一些空间。

这个事务代码在这种情况下花费了50秒,而之前只需要40秒:( - fubo
@fubo:看起来MySQL和SQL-Server的行为不同。 - Stefan Steiger
注意隔离级别。你可能会减慢你的数据库并导致死锁。 - Dzianis Yafimau
仅供帮助,如果列名中有任何空格,MySqlBulkLoader会报错。因此,请先从CSV文件中删除列名中的空格。 - Himalaya Garg

4
如Stefan Steiger所说,批量插入适用于您的情况。
另一个技巧是使用临时表,因此您不会直接写入生产表,而是写入具有相同结构的临时表。将所有信息写完后,只需交换表格即可。使用临时表方法可以避免对插入操作锁定表格(也可用于更新和删除),在一些项目中MySQL会广泛使用此模式。
此外,禁用表键可能会加快插入速度,但启用它们时也可能会引入一些问题(仅适用于MyISAM引擎)。 添加内容: 假设您有一个名为Products的表:
  • ProductId
  • ProductName
  • ProductPrice
为了进行临时保存,您需要创建一个名为ProductsStaging的临时表,其中包含相同列集。所有操作都在临时表上执行:
UpdateStagingTable();
SwapTables();
UpdateStagingTable();

因为在交换表格后,您的暂存表格中没有新数据,所以需要再次调用相同的方法。 在SwapTables()方法中,您执行了一个SQL语句:

RENAME TABLE Products TO ProductsTemp,
             ProductsStaging TO Products,
             ProductsTemp TO ProductsStagin;

数据操作的速度取决于 MySql 引擎(例如 InnoDB、MyISAM 等),因此您可以通过更改引擎来加快插入速度。


1
您可以使用单个语句使“RENAME”成为原子操作。这样,“Products”始终可用。 - Rick James
@RickJames,是的,你说得对。我已经编辑了答案。 - Alex Sikilinda

4
我在使用EF-MySQL时遇到了类似的问题。 EF插入速度太慢,因此采用了fubo提出的方法。首先,性能得到了巨大改善(约20K记录在约10秒内插入),但是随着表格大小的增长(表格中有约1M条记录),插入速度下降到约250秒。

最终找出了问题所在!表格的主键类型为GUID(UUID-char(36))。由于UUID无法顺序索引且每次插入都需要重新建立索引,导致性能下降。

解决方法是将PK替换为bigint(或int)并将其设置为identity列。这样性能得到了改善,在表格中插入平均需要约12秒,而表格中有超过2M条记录!

如果有人遇到类似的问题,我想分享这个发现!


2
我已经找到了避免使用文件进行批量插入的方法。在 这个连接器 中实现了从流中加载。因此,加载可以像这样完成。
  public void InsertData(string table, List<string> columns, List<List<object>> data) {

  using (var con = OpenConnection() as MySqlConnection) {
    var bulk = new MySqlBulkLoader(con);
    using (var stream = new MemoryStream()) {
      bulk.SourceStream = stream;
      bulk.TableName = table;
      bulk.FieldTerminator = ";";
      var writer = new StreamWriter(stream);

      foreach (var d in data)
        writer.WriteLine(string.Join(";", d));

      writer.Flush();
      stream.Position = 0;
      bulk.Load();
    }
  }
}

1
谢谢您!只需记得设置列即可。 - user890332

0

我的建议只是一个想法,而不是示例或解决方案。如果您不使用INSERT,而是将数据作为多个参数传递(不一定要一次性传递所有100K,例如,您可以使用1K的捆绑包),传递给存储过程,该存储过程本身执行INSERT操作,那会怎样呢?


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接