已经存在一个开放的DataReader,即使它尚未...

6
注意:我已经处理了数百万个问题,其中问题是由于未正确处理读取器/连接或由于懒加载处理不当而导致的错误。我相信这个问题是一个不同的问题,可能与MySQL的.NET连接器有关。
我通过其.NET连接器(6.8.3)广泛使用MySQL服务器(5.6)数据库。出于性能原因,所有表都使用MyISAM引擎创建。我只有一个进程(更新:实际上不是这样,请参见下文),顺序访问DB,因此不需要事务和并发。
今天,在处理多个小时后,以下代码片段:
public IEnumerable<VectorTransition> FindWithSourceVector(double[] sourceVector)
{
    var sqlConnection = this.connectionPool.Take();

    this.selectWithSourceVectorCommand.Connection = sqlConnection;

    this.selectWithSourceVectorCommand.Parameters["@epsilon"].Value
        = this.epsilonEstimator.Epsilon.Min() / 10;

    for (int d = 0; d < this.dimensionality; ++d)
    {
        this.selectWithSourceVectorCommand.Parameters["@source_" + d.ToString()]
        .Value = sourceVector[d];
    }

    // *** the following line (201) throws the exception presented below
    using (var reader = this.selectWithSourceVectorCommand.ExecuteReader())
    {
        while (reader.Read())
        {
            yield return ReaderToVectorTransition(reader);
        }
    }

    this.connectionPool.Putback(sqlConnection);
}

以下异常信息被抛出:
MySqlException: There is already an open DataReader associated with this Connection which must be closed first.
以下是堆栈跟踪的相关部分:
at MySql.Data.MySqlClient.ExceptionInterceptor.Throw(Exception exception) at MySql.Data.MySqlClient.MySqlConnection.Throw(Exception ex) at MySql.Data.MySqlClient.MySqlCommand.CheckState() at MySql.Data.MySqlClient.MySqlCommand.ExecuteReader(CommandBehavior behavior) at MySql.Data.MySqlClient.MySqlCommand.ExecuteReader() at implementation.VectorTransitionsMySqlTable.d__27.MoveNext() in C:\Users\bartoszp...\implementation\VectorTransitionsMySqlTable.cs:line 201 at System.Linq.Enumerable.d__3a`1.MoveNext() at System.Linq.Buffer`1..ctor(IEnumerable`1 source) at System.Linq.Enumerable.ToArray[TSource](IEnumerable`1 source) at implementation.VectorTransitionService.Add(VectorTransition vectorTransition) in C:\Users\bartoszp...\implementation\VectorTransitionService.cs:line 38 at Program.Go[T](Environment`2 p, Space parentSpace, EpsilonEstimator epsilonEstimator, ThresholdEstimator thresholdEstimator, TransitionTransformer transitionTransformer, AmbiguityCalculator ac, VectorTransitionsTableFactory vttf, AxesTableFactory atf, NeighbourhoodsTableFactory ntf, AmbiguitySamplesTableFactory astf, AmbiguitySampleMatchesTableFactory asmtf, MySqlConnectionPool connectionPool, Boolean rejectDuplicates, Boolean addNew) in C:\Users\bartoszp...\Program.cs:line 323 connectionPool.Take 返回满足以下谓词的第一个连接:
private bool IsAvailable(MySqlConnection connection)
{
    var result = false;

    try
    {
        if (connection != null
            && connection.State == System.Data.ConnectionState.Open)
        {
            result = connection.Ping();
        }
    }
    catch (Exception e)
    {
        Console.WriteLine("Ping exception: " + e.Message);
    }

    return result && connection.State == System.Data.ConnectionState.Open;
}

(这与我之前的问题有关,当时我解决了一个不同但类似的问题:MySQL fatal error during information_schema query (software caused connection abort))

FindWithSourceVector 方法由以下代码调用:

var existing
    = this.vectorTransitionsTable
        .FindWithSourceVector(vectorTransition.SourceVector)
        .Take(2)
        .ToArray();

我需要找到最多两个重复的向量 - 这是堆栈跟踪的VectorTransitionService.cs:line 38部分。
现在最有趣的部分是:当调试器在异常发生后停止执行时,我调查了sqlConnection对象,发现它没有与之关联的读取器(如下图)!
这是为什么会发生(显然是“随机”的 - 这个方法在过去的大约20小时内被调用了几乎每分钟一次)?我能避免这种情况吗(除了猜测添加一些睡眠时间,当Ping引发异常时并祈祷它会有帮助)?
关于连接池的实现的其他信息:
Get适用于仅调用简单查询且不使用读取器的方法,因此返回的连接可以以可重入方式使用。 在此示例中,它不直接使用(由于涉及读取器)。
public MySqlConnection Get()
{
    var result = this.connections.FirstOrDefault(IsAvailable);

    if (result == null)
    {
        Reconnect();

        result = this.connections.FirstOrDefault(IsAvailable);
    }

    return result;
}
Reconnect方法只是遍历整个数组并重新创建和打开连接。 Take使用Get,但也从可用连接列表中删除返回的连接,以防在某些方法使用读取器期间调用其他需要连接的方法时进行共享。在这里也不是这种情况,因为FindSourceVector方法很简单(不调用使用DB的其他方法)。然而,出于惯例,Take被用来使用读取器:
public MySqlConnection Take()
{
    var result = this.Get();

    var index = Array.IndexOf(this.connections, result);

    this.connections[index] = null;

    return result;
}

Putback 只是将连接放回到第一个空闲的位置,如果连接池已满,则会忽略该连接:

public void Putback(MySqlConnection mySqlConnection)
{
    int index = Array.IndexOf(this.connections, null);

    if (index >= 0)
    {
        this.connections[index] = mySqlConnection;
    }
    else if (mySqlConnection != null)
    {
        mySqlConnection.Close();
        mySqlConnection.Dispose();
    }
}

@JonSkeet 谢谢您的时间。我已经添加了调用。 - BartoszKP
这是多线程吗?也许是定时器,而进程所需时间超过了间隔时间?看起来像某种竞争条件。 - TyCobb
看起来你写了自己的连接池类。.NET sql provider for MySQL 默认支持自动连接池,你只需要在 MySqlConnection 对象周围使用 using 语句,如果检测到以前使用过相同的连接字符串且超时关闭底层连接,ADO.NET 将自动重用连接。 - Scott Chamberlain
@ScottChamberlain 我之前尝试过这个,但不幸的是也遇到了很多问题。当然,那是几周前的事情,所以我不能保证这不是我的代码有问题,但自从那时起,我的连接池一直运行良好(这就是为什么在图片中你可以看到 pooling=False 部分)- 直到今天... 但如果其他方法都无法帮助,我也会尝试回到内置的连接池,谢谢。 - BartoszKP
4
即使我的回答能够解决问题,你也应该绝对恢复正常的连接池。可以这么说:每天有数百万个程序使用内置的连接池。更有可能是你的代码存在错误,而不是连接池代码存在错误,因此添加更多的代码只会增加更多的错误……看起来情况就是这样。 - Jon Skeet
显示剩余3条评论
2个回答

3

我猜测这可能是问题所在,出现在方法的末尾:

this.connectionPool.Putback(sqlConnection);

您只从迭代器中读取了两个元素,因此除非读取器实际只返回一个值,否则永远无法完成 while 循环。 现在您正在使用 LINQ,它将自动调用迭代器上的 Dispose(),因此您的 using 语句仍将处理读取器,但是您没有将连接放回池中。 如果您在 finally 块中这样做,我认为就可以了:

var sqlConnection = this.connectionPool.Take();
try
{
    // Other stuff here...

    using (var reader = this.selectWithSourceVectorCommand.ExecuteReader())
    {
        while (reader.Read())
        {
            yield return ReaderToVectorTransition(reader);
        }
    }
}
finally
{
    this.connectionPool.Putback(sqlConnection);
}

如果您的连接池是自己实现的,最好让Take返回实现IDisposable接口的内容,并在完成后将连接返回到池中。

以下是一个简短但完整的程序,演示了正在发生的事情,没有涉及任何实际的数据库:

using System;
using System.Collections.Generic;
using System.Linq;

class DummyReader : IDisposable
{
    private readonly int limit;
    private int count = -1;
    public int Count { get { return count; } }

    public DummyReader(int limit)
    {
        this.limit = limit;
    }

    public bool Read()
    {
        count++;
        return count < limit;
    }

    public void Dispose()
    {
        Console.WriteLine("DummyReader.Dispose()");
    }
}

class Test
{    
    static IEnumerable<int> FindValues(int valuesInReader)
    {
        Console.WriteLine("Take from the pool");

        using (var reader = new DummyReader(valuesInReader))
        {
            while (reader.Read())
            {
                yield return reader.Count;
            }
        }
        Console.WriteLine("Put back in the pool");
    }

    static void Main()
    {
        var data = FindValues(2).Take(2).ToArray();
        Console.WriteLine(string.Join(",", data));
    }
}

如文中所述——模拟读者只能找到两个值的情况——输出结果为:

Take from the pool
DummyReader.Dispose()
0,1

请注意,读取器已被处理,但我们从池中没有返回任何内容。如果您更改Main以模拟读取器仅具有一个值的情况,则如下所示:
var data = FindValues(1).Take(2).ToArray();

然后我们完成了整个while循环,因此输出结果发生了变化:
Take from the pool
DummyReader.Dispose()
Put back in the pool
0

我建议您复制我的程序并进行实验。确保您完全了解正在发生的一切...然后您可以将其应用于自己的代码中。您可能需要阅读我的文章《迭代器块实现细节》


@BartoszKP:是的,using语句有一个finally语句 - 所以它调用了reader.Dispose(),但那是不同的。如果有超过2个结果,那么最后一个yield之后的代码在您的情况下将不会被执行 - Take方法将处理迭代器而不请求第三个结果,因此在while循环之后的代码没有执行的理由。阅读器只会关闭,因为它实际上处于finally块中。 - Jon Skeet
@BartoszKP:是的,读取器正在被处理 - 但我不认为这是问题所在。我认为bug实际上是您没有将连接返回到池中。我怀疑这会显示出连接池中的错误 - 完全可能是MySqlException中诊断消息中的错误。没有看到更多的连接池代码很难判断,但我建议您在那里添加有关您正在返回的内容的诊断信息等。 - Jon Skeet
@BartoszKP:我建议你不要把这个问题添加到这里,而是花点时间诊断一下它的问题,如果需要的话,提出一个的问题,并附上连接池代码。虽然如我之前所提到的,如果我是你,我会放弃它 :) - Jon Skeet
@BartoszKP:你是否曾经同时“打开”多个迭代器块?例如,在另一个方法中调用一个方法?如果是这样,那么这可能会解释你的Reconnect方法的工作方式。但实际上,我需要逐行进行步进以确保。 - Jon Skeet
好吧...我有点尴尬 :) 原来虽然这并没有提供答案,但当然不是你的错,因为问题中缺少关键细节,而我忘了。关于问题可能在池的实现中的直觉完全正确,所以我接受这个答案,并将我的答案与特定的解释一起添加。然而,整个讨论似乎不太可能对未来的任何人有帮助。我应该删除它吗?你怎么想? - BartoszKP
1
@BartoszKP:您无法删除已获赞答案的问题。我认为这不会造成任何伤害。不过,我很高兴您找到了问题的原因 :) - Jon Skeet

0

TyCobbJon Skeet正确猜测,问题出在池实现和多线程上。我忘记了实际上我在Reconnect方法中启动了一些微小的Task。第一个连接是同步创建和打开的,但所有其他连接都是异步打开的。

我的想法是因为我只需要一个连接,所以其他连接可以在不同的线程中重新连接。然而,由于我并没有总是把连接放回去(如Jon's answer中所解释的),重新连接发生得非常频繁,而且由于系统负载很重,这些重新连接的线程不够快,最终导致了竞争条件。修复方法是更简单、直接地重新连接:

private void Reconnect()
{
    for (int i = 0; i < connections.Length; ++i)
    {
        if (!IsAvailable(this.connections[i]))
        {
            this.ReconnectAt(i);
        }
    }
}

private void ReconnectAt(int index)
{
    try
    {
        this.connections[index] = new MySqlConnection(this.connectionString);
        this.connections[index].Open();
    }
    catch (MySqlException mse)
    {
        Console.WriteLine("Reconnect error: " + mse.Message);
        this.connections[index] = null;
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接