Entity Framework - 如何快速插入/更新数百万条记录

3
我需要向MySQL数据库插入190万条新记录。我使用C# Entity Framework进行操作,但这个过程非常慢。按照当前速度,处理这些记录需要几天时间。
我做错了什么?如何加快速度?
我的数据库中有2个表:哈希(Hashes)和类别(Categories)。每个哈希应该是唯一的,可以有多个类别,每个哈希只能有一个活动类别。
我需要先检查哈希是否存在。如果存在,那么我需要找到当前的类别,将其停用并添加新的类别。
问题在于我的try{}语句大约需要150毫秒,执行SaveChanges()方法的块需要15-30秒左右。因此,以这种方式处理190万条记录需要数天时间。
using (var reader = new StreamReader(File.OpenRead(filepath)))
using (MySQLContext db = new MySQLContext(options))
{
    // Disable auto detect changes
    db.ChangeTracker.AutoDetectChangesEnabled = false;
    int loopCounter = 0;    
    string line;

    // Load up the db tables in memory
    var hashes = db.Hashes.Select(x => x).ToList();
    var category = db.Categories.Select(a => a).ToList();

    while ((line = reader.ReadLine()) != null)
    {
        var matches = Regex.Matches(line, "(?<MD5>[a-zA-Z0-9]+)(?<Category>[0-9])");

        InputHashModel inputHash = new InputHashModel()
        {
            MD5 = matches[0].Groups["MD5"].Value,
            Category = matches[0].Groups["Category"].Value
        };

        try
        {
            // Check if hash already exists
            Hash hash = hashes.Where(h => h.MD5 == inputHash.MD5).FirstOrDefault();

            // If hash doesn't exist - add it
            if (hash == null)
                hash = new Hash(inputHash.MD5);
            else
            {
                // Check if category already exists
                Category category = categories.Where(a => a.Active == true && a.HashId == hash.Id).FirstOrDefault();

                // If it exists - deactivate it
                if (category != null)
                {
                    // If the same category already exists - proceed to next hash
                    if (category.Source == "ThisInput" && category.Category == inputHash.Category)
                        {
                            loopCounter++
                            continue;
                        }

                    category.Active = false;
                    category.DeactivatedTimestamp = DateTime.Now;
                }
            }

            // Add new category
            Category new_category = new Category() { Hash = hash, Source = "ThisInput", Category = inputHash.Category, Active = true);
            db.Categories.Add(new_category);

            // Save changes every 1000
            if (loopCounter % 1000 == 0)
            {
                db.ChangeTracker.DetectChanges();
                db.SaveChanges();
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("Exception: " + e);
        }

        loopCounter++;
    }

    db.ChangeTracker.AutoDetectChangesEnabled = true;
    db.SaveChanges();

    Console.WriteLine("Finished");
}

7
不要使用像EF这样的ORM。ORM用于映射对象到关系结构,但这里没有对象,只有大量的行数据。请使用数据库的批量导入功能来加载数据。在SQL Server中,您可以使用SqlBulkCopy。对于MySQL,请使用MySQLBulkLoader。 - Panagiotis Kanavos
同意@PanagiotisKanavos的观点...此外,“每个哈希只有一个活动类别”可能意味着您需要在哈希和其活动类别之间添加关系,并在所有情况下避免查找。 - rfmodulator
也没有所谓的批量更新。批量加载之所以有效,是因为客户端可以将更改流式传输到服务器,而服务器则可以最小化日志记录地应用这些更改。但是对于更新操作,这种方法不可行。ETL任务通常会将更改加载到一个暂存表中,然后使用键连接目标表并更新匹配的行。 - Panagiotis Kanavos
有一个BulkUpdate()函数,但它是第三方的。编辑:请看下面我的回答。 - ZeW
2个回答

2

这永远不会是最快的方法,但至少你需要避免在更改跟踪器中累积所有实体。例如,在每次SaveChanges()运行后。

    foreach (var e in db.ChangeTracker.Entries())
    {
        e.State = EntityState.Detached;
    }

这并没有帮助。ORM不适用于ETL,尤其是批量操作。操作需要99年而不是100年。这就是为什么所有SQL Server MVP和团队成员都讨厌ORM的原因。此外,OP已经禁用了更改跟踪。 - Panagiotis Kanavos
1
这里唯一的答案是“不要使用ORM”。 - Panagiotis Kanavos
更改跟踪仅影响查询。 “操作将从需要100年变为需要99年。” 这是不正确的。问题并不在于 EF 本身。使用此方法,EF可以每秒插入1000行或更多行,只要您控制好更改跟踪器。如果您不这样做,更改的成本会不断增加,有效的插入速率将降至零。EF永远不会是最快的方法,但对于许多场景(如迁移),拥有一个较慢但简单的方法可能是可以接受的。 - David Browne - Microsoft
@PanagiotisKanavos SQL Server MVPs 不喜欢 ORM,因为开发人员在不应该使用它们的时候使用了它们。ORM非常棒。 - André Mendonça
@AndreMendonca 这是其中一种情况。 - Panagiotis Kanavos
OP的问题是“按照当前速度,处理这些记录需要几天时间”,因为“执行SaveChanges()的代码块大约需要15-30秒”。并不是“我需要以最快的方式加载这些数据”。 - David Browne - Microsoft

0

我通过使用EntityFramework ExtensionsBulkInsert()BulkUpdate()函数来实现这一点。

不过,我采用了稍微不同的方法...

  1. 将具有新关系的新条目导入EF模型中。
  2. 将所有条目从数据库下载到内存中。
  3. 使用.Intersects(myCustomComparer)比较新条目和来自数据库的条目。执行两次以将唯一和重复的条目分开成两个列表。
  4. 仅为重复的条目下载关系条目(按ID查找)。运行关系条目并进行比较-如果需要更新,则将其添加到列表中。如果需要新条目,则将其添加到另一个列表中。
  5. 更新关系条目并添加新条目。
  6. 批量插入唯一条目及其关系条目。

以下是函数的代码:

public class HashMD5Comparer : IEqualityComparer<Hash>
{
 //Products are equal if their names and product numbers are equal.
    public bool Equals(Hash x, Hash y)
{
    //Check whether the compared objects reference the same data.
    if (Object.ReferenceEquals(x, y)) return true;

    //Check whether any of the compared objects is null.
    if (Object.ReferenceEquals(x, null) || Object.ReferenceEquals(y, null))
        return false;

    //Check whether the hash' properties are equal.
    return x.MD5 == y.MD5;
}

// If Equals() returns true for a pair of objects
// then GetHashCode() must return the same value for these objects.

public int GetHashCode(Hash hash)
{
    //Check whether the object is null
    if (Object.ReferenceEquals(hash, null)) return 0;

    //Get hash code for the Name field if it is not null.
    int hashMD5 = hash.MD5 == null ? 0 : hash.MD5.GetHashCode();

    //Calculate the hash code for the hash.
    return hashMD5;
}
}
public class HashComparer
{
private static Hash[] uniqueHashes;
private static Hash[] duplicateHashes;
private static Hash[] duplicateHashesInDb;

private static void SortHashes(Hash[] hashes)
{
    Hash[] hashesInDatabase;

    // Download hashes from database
    using (MySQLContext db = new MySQLContext())
    {
        hashesInDatabase = db.Hashes.Where(h => h.MD5 != null).ToArray();
    }

    // Find duplicates in database
    duplicateHashes = hashes.Intersect(hashesInDatabase, new HashMD5Comparer()).ToArray();
    duplicatehashesInDatabase = hashesInDatabase.Intersect(hashes, new HashMD5Comparer()).ToArray();

    // Find uniques in database
    uniqueHashes = hashes.Except(duplicateHashes, new HashMD5Comparer()).ToArray();
}

private static void ActionDuplicateHashes()
{
    Assessment[] assessmentsInDatabase;
    List<Assessment> assessmentsToDeactivate = new List<Assessment>();
    List<Assessment> assessmentsToAdd = new List<Assessment>();

    // Download assessments from database
    using (MySQLContext db = new MySQLContext(GenerateMySQLOptions()))
    {
        var duplicateHashIds = duplicateHashesInDb.Select(h => h.Id).ToArray();
        assessmentsInDatabase = db.Assessments.Where(a => duplicateHashIds.Contains(a.HashId)).ToArray();
    }

    foreach (var inputHash in duplicateHashes)
    {
        // Lookup the hash in the database to get the ID
        var liveHashId = Array.Find(duplicateHashesInDb, h => h.MD5 == inputHash.MD5).Id;

        // Find the assessment in the database to compare (and deactive if needed)
        var liveAsssessment = Array.Find(assessmentsInDatabase, a => a.HashId == liveHashId);

        // Get the new assessment of the hash
        var newAssessment = inputHash.Assessments.FirstOrDefault();

        if (newAssessment == null)
        {
            Console.WriteLine($"Failed lookup for new assessment {inputHash.MD5}");
            return;
        }
        // Set the hashId (relationship) for the new assessment
        newAssessment.HashId = liveHashId;

        if (liveAsssessment != null)
        {
            if (liveAsssessment.Origin == newAssessment.Origin &&
                liveAsssessment.Category == newAssessment.Category)
            {
                // Exact duplicate - leave as is
            }
            else
            {
                // Deactivate the current assessment in the database
                liveAsssessment.Active = false;

                // Add the assessment to a list to deactive (update)
                assessmentsToDeactivate.Add(liveAsssessment);
                // Add the new assessment that will be added once the old one gets deactivated
                assessmentsToAdd.Add(newAssessment);
            }
        }
        else
        {
            // No assessment for the hash in the database - just add a new one 
            assessmentsToAdd.Add(newAssessment);
        }
    }

    // Bulk update the assessments in the database that are to be deactivated
    using (MySQLContext db = new MySQLContext(GenerateMySQLOptions()))
    {
        db.Assessments.BulkUpdate(assessmentsToDeactivate);
    }

    // Bulk insert the new assessments
    using (MySQLContext db = new MySQLContext(GenerateMySQLOptions()))
    {
        db.Assessments.BulkInsert(assessmentsToAdd);
    }
}

private static void ActionUniqueHashes()
{
    // Bulk insert all unique hashes and their assessments
    using (MySQLContext db = new MySQLContext())
    {
        // options.IncludeGraph adds any relationships to the database as well
        db.Hashes.BulkInsert(uniqueHashes, options => options.IncludeGraph = true);
    }
}
}

还有更多的优化需要完成,因为这个程序使用了大量的RAM。特别是在进行唯一哈希批量插入时(不确定为什么)。但总体而言,它能够正常工作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接