将C#数据表批量导入到PostgreSQL表中

7
我有一个包含数千条记录的datatable。 我有一个与datatable相同字段的postgres表。 我想每天清空这个表并重新填充datatable的数据。我已经看到了SQL批量复制,但它在postgres上不可用。 那么,哪种方法最有效?
  • 每条记录插入一次
  • 多个插入:insert into table values (1,1),(1,2),(1,3),(2,1);
  • 使用linq从datatable中选择并插入postgres?没有头绪...
谢谢。

为什么不使用一个每天更新的物化视图? - Daniël Tulp
尝试使用 **EFCore.BulkExtensions**。 - borisdj
5个回答

7
PostgreSQL确实有批量复制功能(实际上被称为copy),并且它有一个很好的.NET包装器。如果你要加载数据,你需要使用NpgsqlCopyIn,如果你要提取数据,你可以使用NpgsqlCopyOut。根据您提供的细节比较模糊,我不知道您datatable中的字段或实际数据库的任何信息,所以这是一个简单示例,演示如何使用C#/PostgreSQL批量插入数据到表中:
    NpgsqlCopyIn copy = new NpgsqlCopyIn("copy table1 from STDIN WITH NULL AS '' CSV;",
        conn);
    copy.Start();

    NpgsqlCopySerializer cs = new NpgsqlCopySerializer(conn);
    cs.Delimiter = ",";

    foreach (var record in RecordList)
    {
        cs.AddString(record.UserId);
        cs.AddInt32(record.Age);
        cs.AddDateTime(record.HireDate);
        cs.EndRow();
    }

    cs.Close();
    copy.End();

-- 2019年8月27日更新 --

Npgsql的结构已经完全改变。以下是相同示例的引导程序,使用二进制导入(文本也可用):

using (var writer = conn.BeginBinaryImport(
    "copy user_data.part_list from STDIN (FORMAT BINARY)"))
{
    foreach (var record in RecordList)
    {
        writer.StartRow();
        writer.Write(record.UserId);
        writer.Write(record.Age, NpgsqlTypes.NpgsqlDbType.Integer);
        writer.Write(record.HireDate, NpgsqlTypes.NpgsqlDbType.Date);
    }

    writer.Complete();
}

1
NpgsqlCopyIn在v3以上版本中不再支持,参见https://dev59.com/eI_ea4cB1Zd3GeqPUejr。 - Ralph Willgoss
1
没错...我用新的结构更新了它。做得好。 - Hambone

5

也许您可以查看我在另一个答案中描述的小助手,我为此问题创建了一个小助手,使用另一个助手非常容易: https://dev59.com/DW025IYBdhLWcg3wjGtO#46063313

编辑: 我最近遇到了类似的问题,但我们正在使用Postgresql。我想使用有效的批量插入,结果发现这相当困难。我没有找到任何适用于此数据库的适当免费库。我只找到了这个助手: https://bytefish.de/blog/postgresql_bulk_insert/ 它也在Nuget上。我编写了一个小映射器,自动映射属性,就像Entity Framework一样:

public static PostgreSQLCopyHelper<T> CreateHelper<T>(string schemaName, string tableName)
        {
            var helper = new PostgreSQLCopyHelper<T>(schemaName, "\"" + tableName + "\"");
            var properties = typeof(T).GetProperties();
            foreach(var prop in properties)
            {
                var type = prop.PropertyType;
                if (Attribute.IsDefined(prop, typeof(KeyAttribute)))
                    continue;
                switch (type)
                {
                    case Type intType when intType == typeof(int) || intType == typeof(int?):
                        {
                            helper = helper.MapInteger("\"" + prop.Name + "\"",  x => (int?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type stringType when stringType == typeof(string):
                        {
                            helper = helper.MapText("\"" + prop.Name + "\"", x => (string)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type dateType when dateType == typeof(DateTime) || dateType == typeof(DateTime?):
                        {
                            helper = helper.MapTimeStamp("\"" + prop.Name + "\"", x => (DateTime?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type decimalType when decimalType == typeof(decimal) || decimalType == typeof(decimal?):
                        {
                            helper = helper.MapMoney("\"" + prop.Name + "\"", x => (decimal?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type doubleType when doubleType == typeof(double) || doubleType == typeof(double?):
                        {
                            helper = helper.MapDouble("\"" + prop.Name + "\"", x => (double?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type floatType when floatType == typeof(float) || floatType == typeof(float?):
                        {
                            helper = helper.MapReal("\"" + prop.Name + "\"", x => (float?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type guidType when guidType == typeof(Guid):
                        {
                            helper = helper.MapUUID("\"" + prop.Name + "\"", x => (Guid)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                }
            }
            return helper;
        }

我以以下方式使用它(我有一个名为 Undertaking 的实体):
var undertakingHelper = BulkMapper.CreateHelper<Model.Undertaking>("dbo", nameof(Model.Undertaking));
undertakingHelper.SaveAll(transaction.UnderlyingTransaction.Connection as Npgsql.NpgsqlConnection, undertakingsToAdd));

我展示了一个带有事务的示例,但也可以使用从上下文检索到的普通连接来完成。undertakingsToAdd是普通实体记录的可枚举集合,我想将其批量插入到数据库中。

这个解决方案是我经过几个小时的研究和尝试后得出的,正如你所期望的那样,它更快,最终易于使用,而且免费!我真的建议您使用此解决方案,不仅因为上面提到的原因,还因为这是我在Postgresql本身没有任何问题的唯一解决方案,许多其他解决方案例如SqlServer都能无缝工作。


2

如其他答案所述,没有内置的解决方案,只有一些辅助工具库(免费和收费)。个人而言,我想出了自己的解决方案。以下是其优点:

  • 免费,易于使用
  • 不需要额外设置映射,它重用来自 DB 本身和 EF DbContext 的元数据
  • 使用动态代码构建以提高性能

使用方法如下:

var uploader = new NpgsqlBulkUploader(context);
var data = GetALotOfData();
uploader.Insert(data);
// OR
uploader.Update(data);

我在这里进行了描述。


1

有一些选项可以批量插入到PostgreSQL中。

例如,在我的库中,我正在使用SQL Copy

COPY TableName (Column1, Column2, Column3) FROM STDIN BINARY

免责声明:我是项目Bulk-Operations.NET的所有者。

这个库使得执行任何类型的批量操作变得非常容易:

  • BulkInsert(批量插入)
  • BulkUpdate(批量更新)
  • BulkDelete(批量删除)
  • BulkMerge(批量合并)

支持多种数据库提供程序,包括PostgreSQL。

// Easy to use
var bulk = new BulkOperation(connection);
bulk.BulkInsert(dt);
bulk.BulkUpdate(dt);
bulk.BulkDelete(dt);
bulk.BulkMerge(dt);

0
上述解决方案需要您指定列数和它们的类型,因此使您的代码表格特定化。如果您的表相对较小,并且具有相同数量的列和相同/兼容的列类型,则可以以通用方式完成。假设您想将Sqlite表迁移到PosgreSql:
// Get data from SqlLite database table
SQLiteConnection sqliteConnection = new SQLiteConnection(new SQLiteConnectionStringBuilder() { DataSource = @"C:\dataBase.sqlite" }.ConnectionString);
sqliteConnection.Open();
var reader = new SQLiteCommand($"SELECT * from table_which_we_want_to_migrate").ExecuteReader();
var dataFromSqliteTable = new DataTable() { CaseSensitive = true };
dataFromSqliteTable.Load(reader);

// Connect to PostgreSql database
var connection = new NpgsqlConnection(new NpgsqlConnectionStringBuilder()
{
    Host = "localhost",
    Port = 5432,
    Database = "DatabaseName",
    Username = "UserName",
    Password = "Password"
}.ToString());
connection.Open();

// Insert every row from the Sqlite table into PostgreSql table
foreach (DataRow row in dataFromSqliteTable.Rows)
{
    // Create an NpgsqlParameter for every field in the column
    var parameters = new List<DbParameter>();
    for (var i = 0; i < dataFromSqliteTable.Columns.Count; i++)
    {
        parameters.Add(new NpgsqlParameter($"@p{i}", row[i]));
    }
    var parameterNames = string.Join(", ", parameters.Select(p => p.ParameterName));
    
    // Create an INSERT SQL query which inserts the data from the current row into PostgreSql table
    var command = new NpgsqlCommand(
        $"INSERT INTO table_which_we_want_to_migrate VALUES ({parameterNames})",
        connection);
    command.Parameters.AddRange(parameters.ToArray());
    command.ExecuteNonQuery();
}

另一种方法是使用命令行工具和通过CSV文件进行导入/导出。这种方式速度更快,即使是大表也能正常工作:

sqlite3 dataBase.sqlite ".output 'temporaryFile.csv.tmp'" ".headers off" ".mode csv" "SELECT * FROM table_which_we_want_to_migrate;" ".quit"
psql --command="\copy table_which_we_want_to_migrate FROM 'temporaryFile.csv.tmp' DELIMITER ',' CSV"

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接