使用IsolationLevel.ReadUncommitted时,SqlDependency订阅不起作用(与无关的事务有关?)

4
我已经成功地使用SqlDependency,但只有在我不使用与SqlDependency无关的SQL事务IsolationLevel.ReadUncommited时才能实现。
当我在事务中使用IsolationLevel.ReadUncommitted(在下面的注释中重点说明)时,SqlDependency订阅将立即失败,并出现OnChange通知:
sqlNotificationEventArgs.Info = "Isolation";
sqlNotificationEventArgs.Source = "Statement";
sqlNotificationEventArgs.Type = "Subscribe";

当我移除隔离级别时,一切都按预期工作(当然,隔离级别不正确)。

这是我的相关代码:

private static string connString = "the connection string";
[MTAThread]
private static void Main(string[] args)
    while(true)
    {
        using (var context = new LinqDataContext(connString))
        {
            var conn = context.Connection;
            conn.Open();
            /***********************************************************************/
            /* Remove `IsolationLevel.ReadUncommitted` and the SqlDependency works */
            /***********************************************************************/
            using (var trans = conn.BeginTransaction(IsolationLevel.ReadUncommitted))
            {
                // simplified query, the real query uses UPDATE OUTPUT INSERTED
                const string sqlCommand = "SELECT [Columns] FROM dbo.[TABLE] WHERE [Status] = 'ready'";
                results = conn.Query({transaction: trans, sql: sqlCommand});
                trans.Commit();
            }
            DoAwesomeStuffWithTheResults(results, context);
        }
        WaitForWork();
    }
}

与SqlDependency相关的代码:

private static ManualResetEvent _quitEvent = new ManualResetEvent(false);

/// <summary>
/// Sets up a SqlDependency a doesn't return until it receives a Change notification
/// </summary>
private static void WaitForWork(){
    // in case we have dependency running we need to go a head and stop it first. 
    SqlDependency.Stop(connString);
    SqlDependency.Start(connString);

    using (var conn = new SqlConnection(connString))
    {
        using (var cmd = new SqlCommand("SELECT [Status] From dbo.[TABLE]", conn))
        {
            cmd.Notification = null;

            var dependency = new SqlDependency(cmd);
            dependency.OnChange += dependency_OnDataChangedDelegate;

            conn.Open();

            cmd.ExecuteReader();
        }
    }
    _quitEvent.WaitOne();
    SqlDependency.Stop(connString);
}
private static void dependency_OnDataChangedDelegate(object sender, SqlNotificationEventArgs e)
{
    ((SqlDependency)sender).OnChange -= dependency_OnDataChangedDelegate;
    _quitEvent.Set();
}

我觉得在设置SqlDependency之前,我已经妥善处理了上下文、连接和事务,但似乎情况并非如此。

我在这里做错了什么?

2个回答

8
恭喜您成功使用SqlDependency(我一点也不讽刺,许多人都失败了)。
现在是时候阅读MSDN上的创建通知查询主题了。您将看到查询适用于通知的条件,包括此要求:

该语句不能在READ_UNCOMMITTED或SNAPSHOT隔离级别下运行。

我写了关于SqlDependency如何工作的基础知识,也许可以澄清一些误解。另外,由于您正在使用Linq,您可能会对LinqToCache感兴趣,它提供了Linq查询和SqlDependency之间的桥梁。

另一个评论:不要随意启动和停止您的SqlDependency。您很快就会后悔的。应该在应用程序启动期间恰好调用Start()一次,在应用程序关闭期间(严格来说,在应用程序域加载和卸载期间)恰好调用Stop()一次。

关于您的问题:重要的隔离级别是“通知查询”的隔离级别。这意味着,您附加订阅的查询,而不是您执行UPDATE的查询(我不会评论在脏读下进行UPDATE的智慧...或使用脏读进行任何操作的智慧)。据我所知,您展示的代码不应该将查询发布到read_uncommitted下。在发出SET TRANSACTION ISOLATION ...之后,该会话中所有后续事务(即所有语句)都将处于该隔离级别下。您关闭连接(通过DataContext的dispose),然后使用不同的连接。除非...您使用连接池。欢迎加入无辜受害者俱乐部:)。连接池会跨越Close()/Open()边界泄漏隔离级别更改。这就是您的问题。有一些简单的解决方案:

顺便说一下,你还需要阅读这篇文章:将表用作队列


哈哈,谢谢;这是一段艰难的旅程!你是说一旦我使用READ_UNCOMMITTED设置了一个事务,所有后续语句都在该隔离级别下运行吗?或者可能是我没有正确处理事务(使用using应该可以,不是吗?)?(我承认我可能没有完全理解一些核心 SQL 原则) - David Murdoch
在事务中运行SqlDependency命令,并将IsolationLevel设置为“IsolationLevel.Unspecified”,这就是诀窍! - David Murdoch
谢谢您的编辑。我会仔细阅读并相应地更新我的代码。 :-) - David Murdoch

1

以下是基于Remus Rusanu在他的回答中提供的提示进行更新的代码:

private static string connString = "the connection string";
[MTAThread]
private static void Main(string[] args)
    // Start() is supposed to be called exactly once, during app startup
    // and Stop() exactly once during app shutdown:
    SqlDependency.Start(connString);
    AppDomain.CurrentDomain.ProcessExit += delegate
    {
        SqlDependency.Stop(connString);
    };

    while(true) // to infinity, and beyond.
    {
        using (var context = new LinqDataContext(connString))
        {
            var conn = context.Connection;
            // Connection pooling leaks isolation level changes across 
            // Close()/Open() boundaries, use TransactionScope to avoid this.
            using (var scope = CreateTransactionScope(TransactionScopeOption.Required, transactionOptions))
            {
                conn.Open();
                const string sqlCommand = "UPDATE TOP(1) [Table] SET [Status] = 'budy' OUTPUT INSERTED.[Column], */... MORE ...*/ WHERE [Status] = 'ready'";
                results = conn.Query(sqlCommand);
                scope.Complete();
            }
            DoAwesomeStuffWithTheResults(results, context);
        }
        WaitForWork();
    }
}

与SqlDependency相关的代码:

/// <summary>
/// Sets up a SqlDependency and doesn't return until it receives 
/// a Change notification
/// </summary>
private static void WaitForWork(string connString)
{
    var changedEvent = new AutoResetEvent(false);
    OnChangeEventHandler dataChangedDelegate = (sender, e) => changedEvent.Set();
    using (var conn = new SqlConnection(connString))
    {
        using (var scope = Databases.TransactionUtils.CreateTransactionScope())
        {
            conn.Open();
            var txtCmd = "SELECT [FileID] FROM dbo.[File] WHERE [Status] = 'ready'";
            using (var cmd = new SqlCommand(txtCmd, conn))
            {
                var dependency = new SqlDependency(cmd);
                OnChangeEventHandler dataChangedDelegate = null;
                dataChangedDelegate = (sender, e) =>
                {
                    dependency.OnChange -= dataChangedDelegate;
                    changedEvent.Set();
                };
                dependency.OnChange += dataChangedDelegate;
                cmd.ExecuteScalar();
            }
            scope.Complete();
        }
    }
    changedEvent.WaitOne();
    dependency.OnChange -= dependencyOnDataChangedDelegate;
}

新的 TransactionScope 代码:
/// <summary>
/// Using {the default} new TransactionScope Considered Harmful
/// http://blogs.msdn.com/b/dbrowne/archive/2010/06/03/using-new-transactionscope-considered-harmful.aspx
/// </summary>
private static TransactionScope CreateTransactionScope(System.Transactions.IsolationLevel isolationLevel = System.Transactions.IsolationLevel.ReadCommitted)
{
    var transactionOptions = new TransactionOptions
    {
        IsolationLevel = isolationLevel,
        Timeout = TransactionManager.MaximumTimeout
    };
    return new TransactionScope(TransactionScopeOption.Required, transactionOptions);
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接