在多线程环境下,当有多个连接到同一数据库的情况下,分布式事务会如何表现?

7

我正在尝试确定分布式事务中多个数据库连接的行为。

我有一个长时间运行的进程,它生成一系列线程,每个线程负责管理其自己的DB连接等。所有这些都在事务范围内运行,每个线程通过DependentTransaction对象注册到事务中。

当我尝试并行运行此进程时,遇到了一些问题,主要是似乎存在某种阻塞,防止查询在事务上同时执行。

我想知道的是事务协调器如何处理来自多个连接到同一DB的查询,以及是否建议在线程之间传递连接对象?

我已经读到MS SQL只允许每个事务一个连接,但我显然能够创建和初始化多个连接到同一DB的同一事务中。我只是不能并行执行线程,否则会在打开连接时出现“另一个会话正在使用事务上下文”的异常。结果是连接必须等待执行而不是同时运行,在最终代码完成时没有因为此锁定问题而获得任何净收益。

代码大致如下。

    Sub StartThreads()
        Using Scope As New TransactionScope
            Dim TL(100) As Tasks.Task
            Dim dTx As DependentTransaction
            For i As Int32 = 0 To 100
                Dim A(1) As Object
                dTx = CType(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete), DependentTransaction)
                'A(0) = some_other_data
                A(1) = dTx 'the Dependent Transaction

                TL(i) = Tasks.Task.Factory.StartNew(AddressOf Me.ProcessData, A) 'Start the thread and add it to the array
            Next

            Tasks.Task.WaitAll(TL) 'Wait for threads to finish

            Scope.Complete()
        End Using
    End Sub
    Dim TransLock As New Object
    Sub ProcessData(ByVal A As Object)
        Dim DTX As DependentTransaction = A(1)
        Dim Trans As Transactions.TransactionScope
        Dim I As Int32
        Do While True
            Try
                SyncLock (TransLock)
                    Trans = New Transactions.TransactionScope(DTX, TimeSpan.FromMinutes(1))
                End SyncLock
                Exit Do
            Catch ex As TransactionAbortedException
                If ex.ToString.Contains("Failure while attempting to promote transaction") Then
                ElseIf ex.Message = "The transaction has aborted." Then
                    Throw New Exception(ex.ToString)
                    Exit Sub
                End If
                I += 1
                If I > 5 Then
                    Throw New Exception(ex.ToString)
                End If
            Catch ex As Exception

            End Try
            Thread.Sleep(10)
        Loop
        Using Trans
            Using DALS As New DAC.DALScope
                Do While True
                    Try
                        SyncLock (TransLock)
                            'This opens two connection to the same DB for later use.
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.FirstConnection)
                            DALS.CurrentDAL.OpenConnection(DAC.DAL.ConnectionList.SecondConnection)
                        End SyncLock
                        Exit Do
                    Catch ex As Exception
                        'This is usually where I find the bottleneck
                        '"Transaction context in use by another session" is the exception that I get
                        Thread.Sleep(100)
                    End Try
                Loop

                '*****************
                'Do some work here
                '*****************

                Trans.Complete()
            End Using
        End Using
        DTX.Complete()
    End Sub

编辑

我的测试结果表明这是不可能实现的。即使有多个连接或者使用相同的连接,所有请求都是在事务中顺序处理的。也许他们以后会改变这种行为。


好的,我明白你面临一个困境。但是从 SQL 的角度来看,一个事务必须有限制。在我的看法中,跨多个连接承认一个事务违反了这个概念。我可以要求在我的晚餐上加沙拉,但我不希望下一桌也说他们想和我一起吃沙拉。 - paparazzo
1个回答

9
首先,您需要将关于SQL Server事务的阅读材料分为两个不同的情况:本地和分布式。
本地SQL事务:
- SQL Server仅允许在每个本地事务中执行一个请求。 - 默认情况下,只有一个会话可以参与本地事务。使用sp_getbindtoken和sp_bindsession,多个会话可以参与本地事务。这些会话仍然限制在任何时候只能执行一个请求。 - 使用多个活动结果集(MARS),一个会话可以执行多个请求。所有请求都必须在同一本地事务中注册。
分布式事务:
- 多个会话可以将其本地事务注册到单个分布式事务中。 - 每个会话仍然注册了一个本地事务,受到本地事务中提到的所有限制的约束。 - 在分布式事务协调下,注册在分布式事务中的本地事务受到两阶段提交的影响。 - 在注册在分布式事务中的实例上的所有本地事务仍然是独立的本地事务,主要意味着它们具有冲突的锁名称空间。
因此,当客户端创建.Net TransactionScope并在此事务范围内在同一服务器上执行多个请求时,这些请求都是注册在分布式事务中的本地事务。以下是一个简单的示例:
class Program
    {
        static string sqlBatch = @"
set nocount on;
declare @i int;
set @i = 0;
while @i < 100000
begin
    insert into test (a) values (replicate('a',100));
    set @i = @i+1;
end";

        static void Main(string[] args)
        {
            try
            {
                TransactionOptions to = new TransactionOptions();
                to.IsolationLevel = IsolationLevel.ReadCommitted;
                using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
                {
                    using (SqlConnection connA = new SqlConnection(Settings.Default.connString))
                    {
                        connA.Open();
                        using (SqlConnection connB = new SqlConnection(Settings.Default.connString))
                        {
                            connB.Open();

                            SqlCommand cmdA = new SqlCommand(sqlBatch, connA);
                            SqlCommand cmdB = new SqlCommand(sqlBatch, connB);

                            IAsyncResult arA = cmdA.BeginExecuteNonQuery();
                            IAsyncResult arB = cmdB.BeginExecuteNonQuery();

                            WaitHandle.WaitAll(new WaitHandle[] { arA.AsyncWaitHandle, arB.AsyncWaitHandle });

                            cmdA.EndExecuteNonQuery(arA);
                            cmdB.EndExecuteNonQuery(arB);
                        }
                    }
                    scp.Complete();
                }
            }
            catch (Exception e)
            {
                Console.Error.Write(e);
            }
        }
    }

创建一个虚拟测试表格:
create table test (id int not null identity(1,1) primary key, a varchar(100));

运行我的示例代码,您会发现两个请求同时执行,每个请求都将10万行插入表中,然后在事务范围完成时进行提交。因此,您看到的问题与SQL Server或TransactionScope无关,它们可以轻松处理您描述的情况。此外,代码非常简单直接,不需要创建相关事务、克隆或升级事务。 更新 使用显式线程和相关事务:
 private class ThreadState
    {
        public DependentTransaction Transaction {get; set;}
        public EventWaitHandle Done {get; set;}
        public SqlConnection Connection { get; set; }
    }
    static void Main(string[] args)
    {
        try
        {
            TransactionOptions to = new TransactionOptions();
            to.IsolationLevel = IsolationLevel.ReadCommitted;
            using (TransactionScope scp = new TransactionScope(TransactionScopeOption.Required, to))
            {
                ThreadState stateA = new ThreadState 
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateA.Connection.Open();
                ThreadState stateB = new ThreadState
                {
                    Transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete),
                    Done = new AutoResetEvent(false),
                    Connection = new SqlConnection(Settings.Default.connString),
                };
                stateB.Connection.Open();

                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateA);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), stateB);

                WaitHandle.WaitAll(new WaitHandle[] { stateA.Done, stateB.Done });

                scp.Complete();

                //TODO: dispose the open connections
            }

        }
        catch (Exception e)
        {
            Console.Error.Write(e);
        }
    }

    private static void Worker(object args)
    {
        Debug.Assert(args is ThreadState);
        ThreadState state = (ThreadState) args;
        try
        {
            using (TransactionScope scp = new TransactionScope(state.Transaction))
            {
                SqlCommand cmd = new SqlCommand(sqlBatch, state.Connection);
                cmd.ExecuteNonQuery();
                scp.Complete();
            }
            state.Transaction.Complete();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine(e);
            state.Transaction.Rollback();
        }
        finally
        {
            state.Done.Set();
        }

    }

感谢您的回复。如果要使用多个线程,您会如何处理呢?在我的场景中,我必须能够运行多个线程,因此事务将在它们之间分布。根据您的帖子,看起来我需要在线程之间传递连接对象。那么这是正确的评估吗? - Middletone
看到我的更新。我无法让依赖事务在线程中打开连接,所以我不得不传递已经打开的连接(并且我假设已经在DTC中注册)。 - Remus Rusanu
我的测试已经明确表明这是不可能完成的。即使有多个连接或者相同的连接在事务中被使用,所有请求或问题也会按顺序进行处理。也许他们将来会改变这种行为。 - Middletone

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接