我可以使用Parallel.For和SQL命令吗?

4
我有一个范围的课程。
public class avl_range
{
    public long start { get; set; }
    public long end { get; set; }
}

如果我使用普通的FOR循环,一切都很完美,但必须等待每个命令完成,每个查询需要8秒钟,因此10个查询需要80秒钟。
在并行版本中,如果只打印范围,一切都很完美,但是如果尝试执行该命令,则会显示已经在进行中。
“已经有一个操作正在进行中。”
我该如何解决这个问题?
var numbers = new List<avl_range>();
using (var conn = new NpgsqlConnection(strConnection))
    {
        conn.Open();

        Action<avl_range> forEachLoop = number => //Begin definition of forLoop
        {
             // only the console write line works ok
            Console.WriteLine(number.start + " - " + number.end);

            using (var cmd = new NpgsqlCommand())
            {
                cmd.Connection = conn;                            
                cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
                                                 , number.start
                                                 , number.end);
                // here cause the error.
                using (var reader = cmd.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        Console.WriteLine(reader.GetString(0));
                    }
                }
            }
        };

        Parallel.ForEach(numbers, forEachLoop);
    }
 );

FYI:我正在尝试解决这个问题,我之前已经发布了它 在此之前

在微软的 SQL Server 中,这会导致错误,因为您不能在同一连接上拥有多个打开的记录集(除非启用 MARS)。我不知道您的 SQL 提供程序是否以相同的方式实现,但我猜测很可能如此。 - Bradley Uffner
在并行 foreach 中创建一个新的 NpgsqlConnection 怎么样? - Botond Botos
@BradleyUffner,你能提供一些文档吗?我可以尝试,但是为每个查询创建不同的连接对我来说似乎不合逻辑。 - Juan Carlos Oropeza
这在微软的SQL实现中应该可以工作。 我无法确定在使用过的postressql中是否有效,但还是值得一试的。 只要它汇集连接,就不应该有太多性能损失。 - Bradley Uffner
这个 https://dev59.com/8V_Va4cB1Zd3GeqPQB3B 看起来表明PostgreSQL没有MARS等效功能。 - Bradley Uffner
@botond.botos 谢谢,这个方法可行,但不确定这是否是正确的做法。 - Juan Carlos Oropeza
2个回答

4

Npgsql连接不能同时使用 - 在任何给定时间只能运行一个命令(换句话说,不支持MARS)。

在并行执行查询时,打开多个连接肯定是有意义的。虽然建立新的物理连接很昂贵,但连接池非常轻量级,因此重用物理连接的开销非常小。不这样做的主要原因是如果您需要将多个操作放在同一事务中。


您确定Npgsql实现了连接池吗?这取决于ADO.NET提供程序是否想要支持。老实说,我不知道他们是否支持。 - Scott Chamberlain
1
是的,我就是写这个的那个人 :) - Shay Rojansky
嗨,Shay,我是 Npgsql 论坛上的 dropyghost :)。我刚刚把连接创建放在了 forEachLoop 的内部,现在可以正常工作了。这样做可以吗?或者我应该做些其他的事情,因为你提到过重用连接? - Juan Carlos Oropeza
@JuanCarlosOropeza 连接的重复使用在幕后自动进行,唯一需要注意的是确保两个新连接对象之间的连接字符串完全相同。 - Scott Chamberlain
1
@JuanCarlosOropeza 如果你的代码只是写入控制台,那可能是导致速度变慢的部分。控制台是最慢的部分。尝试使用普通的foreach循环而不是并行循环,但将字符串存储在列表中而不是写入控制台,并注释掉Console.WriteLine(number.start + " - " + number.end);,然后告诉我们你得到了什么时间。 - Scott Chamberlain
显示剩余3条评论

3
即使你能让它与MARS一起工作,连接对象几乎永远不是线程安全的,你需要每个线程一个连接。Parallel.ForEach有超载函数可以轻松实现这一点,这些函数在线程开始和结束时运行
var numbers = new List<avl_range>();

Func<NpgsqlConnection> localInit => () => 
{
    var conn = new NpgsqlConnection(strConnection);
    conn.Open();
};

Action<NpgsqlConnection> localFinally = (conn) => conn.Dispose();

Func<avl_range, ParallelLoopState, NpgsqlConnection, NpgsqlConnection> forEachLoop = (number, loopState, conn) => //Begin definition of forLoop
{
     // only the console write line works ok
    Console.WriteLine(number.start + " - " + number.end);

    using (var cmd = new NpgsqlCommand())
    {
        cmd.Connection = conn;                            
        cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
                                         , number.start
                                         , number.end);
        // here cause the error.
        using (var reader = cmd.ExecuteReader())
        {
            while (reader.Read())
            {
                Console.WriteLine(reader.GetString(0));
            }
        }
    }
    return conn;
};

Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);

话虽如此,大多数情况下并发连接到数据库并不是正确的想法,瓶颈很可能在其他地方,您应该使用分析器来查看真正拖慢程序的原因并集中精力解决。


评论样例代码:

var numbers = GetDataForNumbers();
List<string> results = new List<string>();

Func<List<string>> localInit => () => new List<string>();

Func<avl_range, ParallelLoopState, List<string>, List<string>> forEachLoop = (number, loopState, localList) => //Begin definition of forLoop
{
    using (var conn = new NpgsqlConnection(strConnection))
    {
        conn.Open();

        //This line is going to slow your program down a lot, so i commented it out.
        //Console.WriteLine(number.start + " - " + number.end);

        using (var cmd = new NpgsqlCommand())
        {
            cmd.Connection = conn;                            
            cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
                                             , number.start
                                             , number.end);
            using (var reader = cmd.ExecuteReader())
            {
                while (reader.Read())
                {
                    //Add a object to the thread local list, we don't need to lock here because we are the only thread with access to it.
                    localList.Add(reader.GetString(0));
                }
            }
        }
    }
    return localList;
};

Action<List<String>> localFinally = localList => 
{
    //Combine the local list to the main results, we need to lock here as more than one thread could be merging at once.
    lock(results)
    {
        results.AddRange(localList);
    }
};

Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);

//results now contains strings from all the threads here.

谢谢Scott,这种设置和将创建连接放在内部有什么区别吗? - Juan Carlos Oropeza
如果NpgsqlConnection不支持此连接池方式,则利用这种方式会更快,因为它会创建较少的总连接(单个线程可以重用多次循环体)。但是由于Shay确认了它支持连接池,我宁愿将其放在foreach体内,因为这样可以使代码更简单易读,并且具有完全相同的性能。所有这些代码所做的就是通过每个线程拥有一个连接来创建新的“池”效应,因为提供程序拥有自己的池,所以我们不需要自己创建。 - Scott Chamberlain
一个常见的用例是在foreach之外创建一个对象,比如var combinedList = new List<T>,然后在localinit中也执行return new List<T>并在循环体中向该本地列表写入内容。 在你的localFinally中,通过localList => { lock(combinedList) { combinedList.AddRange(localList); } }以线程安全的方式合并各个线程的列表到主列表中,这样循环体中就没有锁,唯一需要锁定的地方就在localFinally中。 - Scott Chamberlain
@JuanCarlosOropeza 我修改了你的代码,并创建了一个使用List<String>的示例,详见我的更新答案。 - Scott Chamberlain
注释掉的 console.log 并没有改变时间,看起来他们对此进行了优化。接下来的问题是:在 Parallel.For 后我只留下一个注释,然后出现了几条关于 线程结束 的信息。我应该添加一些内容等待这些线程结束吗?或者直接跟随其他任务也可以吗? - Juan Carlos Oropeza
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接