Entity framework 中如何提高批量插入性能

116

我希望通过Entity Framework在表中插入20000条记录,但需要2分钟左右。除了使用存储过程之外,是否有其他方法来提高性能?这是我的代码:

 foreach (Employees item in sequence)
 {
   t = new Employees ();
   t.Text = item.Text;
   dataContext.Employees.AddObject(t);                  
 }
 dataContext.SaveChanges();

3
请查看此解决方案 关于如何使用 SqlBulkCopy 处理 Generic List<T> 。适用于 Code First POCOs,速度会更快。 - shox
我在一些数据上使用了这个,将我的插入时间从半小时以上改进到了约15秒(大约50K行)。 - shox
SqlBulkCopy是(并且一直是)将记录插入SQL Server的最快方法,我认为我在下面的答案中提供的实现比@dubbreak的实现更好。我描述的陷阱也适用于那段代码。 - Mick
11个回答

220

如果您正在使用DbContext,那么有几个改进的机会:

设置:

yourContext.Configuration.AutoDetectChangesEnabled = false;
yourContext.Configuration.ValidateOnSaveEnabled = false;

对于插入操作,建议使用每组100个保存的方式 SaveChanges()... 或者您可以尝试使用每组1000个条目,并查看性能变化。

由于在所有这些插入过程中,上下文是相同的并且正在变得越来越大,因此您可以每1000次插入重建一次上下文对象 var yourContext = new YourContext(); 我认为这是一个很大的收获。

通过在我的数据导入流程中进行这些改进,将时间从7分钟缩短到6秒。

实际数字可能不是100或1000,您可以尝试并进行微调。


16
我做了这个,我的数据插入了19,000行,所需时间从20分钟缩短到不到10秒。 - Stuntbeaver
5
用这个方法处理40000行数据大约只需要4秒钟。我没有更新上下文,只是使用了配置更改,并且每1000行保存一次。非常棒。 - Lotok
4
我可以确认。这将使批量导入的效率提高100000%! - Tys
1
我甚至不需要将它保存在100个或1000个对象的包中,就可以看到巨大的性能提升。在生产代码中使用一些魔数作为包大小可能是危险的,因为它可能在您的环境中运行良好,但在客户端却不行。无论如何,这种方法非常有效。 - Ondra
1
我认为我是唯一一个这样的人,但对我来说没有任何影响。 - devuxer
显示剩余8条评论

39

以这种方式进行操作,没有办法强制EF提高性能。问题是EF会在单独的数据库往返中执行每个插入操作。太棒了吧?甚至DataSet都支持批处理。查看这篇文章获取一些解决方法。另一个解决方法可以是使用自定义存储过程来接收表值参数,但这需要使用原始的ADO.NET。


6
你可以检查我的答案,有改进表现的空间。 - Romias
13
我不确定为什么这个被接受的答案如此荒谬,存在一些方法可以在使用EF进行大量插入时提高性能。Romias提到了其中的一个,另一个是将所有插入操作包装在单个事务范围内。如果这些选项仍然无法满足您的性能需求(如果是这种情况,则可能存在其他问题),您可以从context中获取连接对象,并使用SQLBulkCopy对象加载数据。 - Zac Howland

25
使用以下代码,您可以扩展部分上下文类,添加一个方法,以便将实体对象集合批量复制到数据库中。只需将类的名称从MyEntities替换为您的实体类名称并将其添加到项目中的正确命名空间中。之后,您只需要调用BulkInsertAll方法,并交付要插入的实体对象即可。请勿重复使用上下文类,而是每次使用时创建一个新实例。在某些版本的EF中,这是必需的,因为在此处使用的SQLConnection关联的身份验证数据在使用该类一次后会丢失。我不知道为什么。
此版本适用于EF 5。
public partial class MyEntities
{
    public void BulkInsertAll<T>(T[] entities) where T : class
    {
        var conn = (SqlConnection)Database.Connection;

        conn.Open();

        Type t = typeof(T);
        Set(t).ToString();
        var objectContext = ((IObjectContextAdapter)this).ObjectContext;
        var workspace = objectContext.MetadataWorkspace;
        var mappings = GetMappings(workspace, objectContext.DefaultContainerName, typeof(T).Name);

        var tableName = GetTableName<T>();
        var bulkCopy = new SqlBulkCopy(conn) { DestinationTableName = tableName };

        // Foreign key relations show up as virtual declared 
        // properties and we want to ignore these.
        var properties = t.GetProperties().Where(p => !p.GetGetMethod().IsVirtual).ToArray();
        var table = new DataTable();
        foreach (var property in properties)
        {
            Type propertyType = property.PropertyType;

            // Nullable properties need special treatment.
            if (propertyType.IsGenericType &&
                propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
            {
                propertyType = Nullable.GetUnderlyingType(propertyType);
            }

            // Since we cannot trust the CLR type properties to be in the same order as
            // the table columns we use the SqlBulkCopy column mappings.
            table.Columns.Add(new DataColumn(property.Name, propertyType));
            var clrPropertyName = property.Name;
            var tableColumnName = mappings[property.Name];
            bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(clrPropertyName, tableColumnName));
        }

        // Add all our entities to our data table
        foreach (var entity in entities)
        {
            var e = entity;
            table.Rows.Add(properties.Select(property => GetPropertyValue(property.GetValue(e, null))).ToArray());
        }

        // send it to the server for bulk execution
        bulkCopy.BulkCopyTimeout = 5 * 60;
        bulkCopy.WriteToServer(table);

        conn.Close();
    }

    private string GetTableName<T>() where T : class
    {
        var dbSet = Set<T>();
        var sql = dbSet.ToString();
        var regex = new Regex(@"FROM (?<table>.*) AS");
        var match = regex.Match(sql);
        return match.Groups["table"].Value;
    }

    private object GetPropertyValue(object o)
    {
        if (o == null)
            return DBNull.Value;
        return o;
    }

    private Dictionary<string, string> GetMappings(MetadataWorkspace workspace, string containerName, string entityName)
    {
        var mappings = new Dictionary<string, string>();
        var storageMapping = workspace.GetItem<GlobalItem>(containerName, DataSpace.CSSpace);
        dynamic entitySetMaps = storageMapping.GetType().InvokeMember(
            "EntitySetMaps",
            BindingFlags.GetProperty | BindingFlags.NonPublic | BindingFlags.Instance,
            null, storageMapping, null);

        foreach (var entitySetMap in entitySetMaps)
        {
            var typeMappings = GetArrayList("TypeMappings", entitySetMap);
            dynamic typeMapping = typeMappings[0];
            dynamic types = GetArrayList("Types", typeMapping);

            if (types[0].Name == entityName)
            {
                var fragments = GetArrayList("MappingFragments", typeMapping);
                var fragment = fragments[0];
                var properties = GetArrayList("AllProperties", fragment);
                foreach (var property in properties)
                {
                    var edmProperty = GetProperty("EdmProperty", property);
                    var columnProperty = GetProperty("ColumnProperty", property);
                    mappings.Add(edmProperty.Name, columnProperty.Name);
                }
            }
        }

        return mappings;
    }

    private ArrayList GetArrayList(string property, object instance)
    {
        var type = instance.GetType();
        var objects = (IEnumerable)type.InvokeMember(property, BindingFlags.GetProperty | BindingFlags.NonPublic | BindingFlags.Instance, null, instance, null);
        var list = new ArrayList();
        foreach (var o in objects)
        {
            list.Add(o);
        }
        return list;
    }

    private dynamic GetProperty(string property, object instance)
    {
        var type = instance.GetType();
        return type.InvokeMember(property, BindingFlags.GetProperty | BindingFlags.NonPublic | BindingFlags.Instance, null, instance, null);
    }
}

这个版本适用于 EF 6。

public partial class CMLocalEntities
{
    public void BulkInsertAll<T>(T[] entities) where T : class
    {
        var conn = (SqlConnection)Database.Connection;

        conn.Open();

        Type t = typeof(T);
        Set(t).ToString();
        var objectContext = ((IObjectContextAdapter)this).ObjectContext;
        var workspace = objectContext.MetadataWorkspace;
        var mappings = GetMappings(workspace, objectContext.DefaultContainerName, typeof(T).Name);

        var tableName = GetTableName<T>();
        var bulkCopy = new SqlBulkCopy(conn) { DestinationTableName = tableName };

        // Foreign key relations show up as virtual declared 
        // properties and we want to ignore these.
        var properties = t.GetProperties().Where(p => !p.GetGetMethod().IsVirtual).ToArray();
        var table = new DataTable();
        foreach (var property in properties)
        {
            Type propertyType = property.PropertyType;

            // Nullable properties need special treatment.
            if (propertyType.IsGenericType &&
                propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
            {
                propertyType = Nullable.GetUnderlyingType(propertyType);
            }

            // Since we cannot trust the CLR type properties to be in the same order as
            // the table columns we use the SqlBulkCopy column mappings.
            table.Columns.Add(new DataColumn(property.Name, propertyType));
            var clrPropertyName = property.Name;
            var tableColumnName = mappings[property.Name];
            bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(clrPropertyName, tableColumnName));
        }

        // Add all our entities to our data table
        foreach (var entity in entities)
        {
            var e = entity;
            table.Rows.Add(properties.Select(property => GetPropertyValue(property.GetValue(e, null))).ToArray());
        }

        // send it to the server for bulk execution
        bulkCopy.BulkCopyTimeout = 5*60;
        bulkCopy.WriteToServer(table);

        conn.Close();
    }

    private string GetTableName<T>() where T : class
    {
        var dbSet = Set<T>();
        var sql = dbSet.ToString();
        var regex = new Regex(@"FROM (?<table>.*) AS");
        var match = regex.Match(sql);
        return match.Groups["table"].Value;
    }

    private object GetPropertyValue(object o)
    {
        if (o == null)
            return DBNull.Value;
        return o;
    }

    private Dictionary<string, string> GetMappings(MetadataWorkspace workspace, string containerName, string entityName)
    {
        var mappings = new Dictionary<string, string>();
        var storageMapping = workspace.GetItem<GlobalItem>(containerName, DataSpace.CSSpace);
        dynamic entitySetMaps = storageMapping.GetType().InvokeMember(
            "EntitySetMaps",
            BindingFlags.GetProperty | BindingFlags.Public | BindingFlags.Instance,
            null, storageMapping, null);

        foreach (var entitySetMap in entitySetMaps)
        {
            var typeMappings = GetArrayList("EntityTypeMappings", entitySetMap);
            dynamic typeMapping = typeMappings[0];
            dynamic types = GetArrayList("Types", typeMapping);

            if (types[0].Name == entityName)
            {
                var fragments = GetArrayList("MappingFragments", typeMapping);
                var fragment = fragments[0];
                var properties = GetArrayList("AllProperties", fragment);
                foreach (var property in properties)
                {
                    var edmProperty = GetProperty("EdmProperty", property);
                    var columnProperty = GetProperty("ColumnProperty", property);
                    mappings.Add(edmProperty.Name, columnProperty.Name);
                }
            }
        }

        return mappings;
    }

    private ArrayList GetArrayList(string property, object instance)
    {
        var type = instance.GetType();
        var objects = (IEnumerable)type.InvokeMember(
            property, 
            BindingFlags.GetProperty | BindingFlags.Public | BindingFlags.Instance, null, instance, null);
        var list = new ArrayList();
        foreach (var o in objects)
        {
            list.Add(o);
        }
        return list;
    }

    private dynamic GetProperty(string property, object instance)
    {
        var type = instance.GetType();
        return type.InvokeMember(property, BindingFlags.GetProperty | BindingFlags.Public | BindingFlags.Instance, null, instance, null);
    }

}

最后,给你们Linq-To-Sql爱好者一点小东西。
partial class MyDataContext
{
    partial void OnCreated()
    {
        CommandTimeout = 5 * 60;
    }

    public void BulkInsertAll<T>(IEnumerable<T> entities)
    {
        entities = entities.ToArray();

        string cs = Connection.ConnectionString;
        var conn = new SqlConnection(cs);
        conn.Open();

        Type t = typeof(T);

        var tableAttribute = (TableAttribute)t.GetCustomAttributes(
            typeof(TableAttribute), false).Single();
        var bulkCopy = new SqlBulkCopy(conn) { 
            DestinationTableName = tableAttribute.Name };

        var properties = t.GetProperties().Where(EventTypeFilter).ToArray();
        var table = new DataTable();

        foreach (var property in properties)
        {
            Type propertyType = property.PropertyType;
            if (propertyType.IsGenericType &&
                propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
            {
                propertyType = Nullable.GetUnderlyingType(propertyType);
            }

            table.Columns.Add(new DataColumn(property.Name, propertyType));
        }

        foreach (var entity in entities)
        {
            table.Rows.Add(properties.Select(
              property => GetPropertyValue(
              property.GetValue(entity, null))).ToArray());
        }

        bulkCopy.WriteToServer(table);
        conn.Close();
    }

    private bool EventTypeFilter(System.Reflection.PropertyInfo p)
    {
        var attribute = Attribute.GetCustomAttribute(p, 
            typeof (AssociationAttribute)) as AssociationAttribute;

        if (attribute == null) return true;
        if (attribute.IsForeignKey == false) return true; 

        return false;
    }

    private object GetPropertyValue(object o)
    {
        if (o == null)
            return DBNull.Value;
        return o;
    }
}

3
有人知道为什么我尝试这样做时会出现错误,引用了“EntitySetMaps”的错误消息:“Method 'System.Data.Entity.Core.Mapping.EntityContainerMapping.EntitySetMaps' not found." - cjb110
你正在使用哪个版本的EF? - Måns Tånneryd
啊,糟糕了,你的代码是 EF 6 版本和 Nuget 上的 6.1.1 版本。我正在使用 Code First。"慢速" 方法可以正常工作。 - cjb110
1
他们可能在6.1.1中更改了一些元数据属性名称。我会去查一下。 - Måns Tånneryd
@cjb110,我认为6.1.x的一些方法已经改变了。EntitySetMappings对我来说似乎是有效的。 - Chris Woolum
3
@MånsTånneryd 谢谢!我使用的是 EF 6.1.3,确实属性名称已更改。所以我把 GetMappings() 改成了: EntitySetMaps 改为 EntitySetMappings;Types 改为 EntityTypes; MappingFragments 改为 Fragments; AllProperties 改为 PropertyMappings; EdmProperty 改为 Property; ColumnProperty 改为 Column - DarkMFJ

7
也许这里的答案能帮助您:链接。似乎您想定期处理上下文。这是因为随着附加实体增长,上下文会变得越来越大。

5

您的代码存在两个主要性能问题:

  • 使用Add方法
  • 使用SaveChanges方法

使用Add方法

每添加一个实体,Add方法就会变得越来越慢。

参见:http://entityframework.net/improve-ef-add-performance

例如,通过以下方式添加10,000个实体:

  • Add(需要大约105,000毫秒)
  • AddRange(需要大约120毫秒)

注意:实体尚未保存在数据库中!

问题在于,Add方法在每个添加的实体上都尝试进行DetectChanges,而AddRange则在所有实体添加到上下文后只执行一次。

常见解决方案包括:

  • 使用AddRange代替Add
  • 将AutoDetectChanges设置为false
  • 将SaveChanges拆分为多个批次

使用SaveChanges方法

Entity Framework并不适用于批量操作。对于每个要保存的实体,都会执行一次数据库往返。

因此,如果要插入20,000条记录,则会执行20,000次数据库往返,这是疯狂的

有一些支持批量插入的第三方库可用:

  • Z.EntityFramework.Extensions(推荐
  • EFUtilities
  • EntityFramework.BulkInsert

参见:Entity Framework Bulk Insert library

注意,在选择批量插入库时要小心。只有Entity Framework Extensions支持所有类型的关联和继承,并且它是唯一仍受支持的库。


免责声明:我是Entity Framework Extensions的所有者

此库允许您执行所需的所有批量操作:

  • 批量SaveChanges
  • 批量插入
  • 批量删除
  • 批量更新
  • 批量合并

示例

// Easy to use
context.BulkSaveChanges();

// Easy to customize
context.BulkSaveChanges(bulk => bulk.BatchSize = 100);

// Perform Bulk Operations
context.BulkDelete(customers);
context.BulkInsert(customers);
context.BulkUpdate(customers);

// Customize Primary Key
context.BulkMerge(customers, operation => {
   operation.ColumnPrimaryKeyExpression = 
        customer => customer.Code;
});

编辑:在评论中回答问题

你创建的库是否有推荐的最大批量插入大小?

不要太高,也不要太低。没有适合所有场景的特定值,因为它取决于多个因素,如行大小、索引、触发器等。

通常建议在4000左右。

还有一种方法可以将其全部绑定在一个事务中而不必担心超时吗?

您可以使用Entity Framework(实体框架)事务。如果启动了事务,我们的库将使用该事务。但是,请注意,需要太长时间的事务也可能带来一些问题,例如某些行/索引/表锁定。


你创建的库中,每个批量插入的推荐最大大小是多少?此外,有没有一种方法将其全部绑定在一个事务中,而不必担心超时问题?谢谢! - darewreck

4

目前还没有更好的方法,但将SaveChanges放在循环内部可能会为大约10个项目带来微小的改进。

int i = 0;

foreach (Employees item in sequence)
{
   t = new Employees ();
   t.Text = item.Text;
   dataContext.Employees.AddObject(t);   

   // this will add max 10 items together
   if((i % 10) == 0){
       dataContext.SaveChanges();
       // show some progress to user based on
       // value of i
   }
   i++;
}
dataContext.SaveChanges();

您可以将10调整为更接近于更好的性能。这不会极大地提高速度,但它将允许您向用户显示一些进度并使其更加用户友好。

4

在具有1个实例的基本网站的Azure环境中。 我尝试使用for循环插入1000条记录,总共有25000条记录。串行执行需要11.5分钟,但并行执行只需要不到一分钟。因此,我建议使用TPL(任务并行库)。

         var count = (you collection / 1000) + 1;
         Parallel.For(0, count, x =>
        {
            ApplicationDbContext db1 = new ApplicationDbContext();
            db1.Configuration.AutoDetectChangesEnabled = false;

            var records = members.Skip(x * 1000).Take(1000).ToList();
            db1.Members.AddRange(records).AsParallel();

            db1.SaveChanges();
            db1.Dispose();
        });

让我澄清一下这段代码: 第1行:var count = (your collections.Count / 1000) + 1; 第7行:members 是您的集合,要么是...当我针对我的情况运行此代码时,我遇到了这个错误_Transaction (Process ID 80) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction._ - Abdurrahman I.
对于可能发生的异常情况,我更愿意将dbContext的创建和释放放入using块中。 - michal.jakubeczy

4
更好的方法是完全跳过Entity Framework,使用SqlBulkCopy类进行此操作。其他操作可以像以前一样继续使用EF。
这增加了解决方案的维护成本,但无论如何,与使用EF相比,有助于将大量对象插入数据库所需的时间减少一到两个数量级。
这里有一篇文章比较了具有父子关系的对象的SqlBulkCopy类和EF(还描述了实现批量插入所需的设计更改):如何将复杂对象批量插入SQL Server数据库

外键问题或唯一键冲突会发生什么?整个操作会被回滚吗? - Dan Esparza
1
考虑将批量插入视为业务交易,而不是系统交易。你的问题应该传递给业务所有者来决定。我见过实践中的不同选择,对于我们程序员来说都一样好:(1)回滚所有并使用户更正数据;(2)提交到某个点并通知用户其余部分未被处理,(3)跳过并继续,然后使用失败的记录通知用户。解决方案2和3需要在异常处理方面进行一些调整,通常不容易实现。 - Zoran Horvat

3
尝试使用批量插入(Bulk Insert)... 如果您有一组实体,例如storeEntities,可以使用SqlBulkCopy将它们存储如下:http://code.msdn.microsoft.com/LinqEntityDataReader
        var bulkCopy = new SqlBulkCopy(connection);
        bulkCopy.DestinationTableName = TableName;
        var dataReader = storeEntities.AsDataReader();
        bulkCopy.WriteToServer(dataReader);

这段代码有一个问题需要注意。确保实体框架的定义与表定义完全一致,确保实体模型中的属性与 SQL Server 表中的列以相同的顺序排列。否则,会导致异常。

0
虽然回复晚了,但我还是发表答案,因为我也遇到了同样的问题。 我创建了一个新的 GitHub 项目,目前它支持使用 SqlBulkCopy 透明地进行 Sql server 的批量插入/更新/删除。

https://github.com/MHanafy/EntityExtensions

还有其他好东西,希望它将来能够扩展更多功能。

使用它就像这样简单:

var insertsAndupdates = new List<object>();
var deletes = new List<object>();
context.BulkUpdate(insertsAndupdates, deletes);

希望能够帮到你!

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接