TThreadedQueue是否不支持多个消费者?

44

尝试在单生产者多消费者方案中使用TThreadedQueue(Generics.Collections)(Delphi-XE)。想法是将对象推入队列,让多个工作线程排空队列。

但是,它并没有按预期工作。当两个或更多工作线程调用PopItem时,TThreadedQueue会抛出访问冲突。

如果使用关键段对PopItem的调用进行串行化,则一切正常。

显然,TThreadedQueue应该能够处理多个消费者,那么我是否遗漏了什么,还是这是TThreadedQueue中的纯错误?

这里有一个简单的示例可以产生错误。

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

更新:在Delphi XE2中已修复导致TThreadedQueue崩溃的TMonitor中的错误。

更新2:上述测试在队列为空状态下对其进行了压力测试。Darian Miller发现在满状态下对队列进行压力测试,仍然可以在XE2中重现该错误。错误再次出现在TMonitor中。有关更多信息,请参见他下面的答案。还有一个QC101114的链接。

更新3: 在Delphi-XE2更新4中,宣布修复了TMonitor中的问题,这将解决TThreadedQueue中的问题。到目前为止,我的测试无法再现TThreadedQueue中的任何错误。 测试了空队列和满队列时的单生产者/多消费者线程。 还测试了多个生产者/多个消费者。我将读取器线程和写入器线程从1变化到100,没有任何故障。但是,考虑到历史,我敢挑战其他人来打破TMonitor


4
你好,LU RD!欢迎来到StackOverflow。这是一个不错的问题,但如果代码稍微贴出一点不同可能会更容易测试。你只包含了一个窗体的.pas部分,没有对应的DFM文件,这使得我们难以复制和调查。问题似乎与UI无关,所以有没有办法将其简化为控制台应用程序?谢谢。 - Mason Wheeler
梅森,控制台应用完成。 - LU RD
1
XE2仍存在问题... - Darian Miller
1
XE2 更新4修复了这些问题。 - Darian Miller
1
请查看@DarianMiller的博客文章重新审视Delphi中的TThreadedQueue和TMonitor,了解TMonitorTThreadedQueue的最新状态。 - LU RD
5个回答

19

很难确定是否需要进行大量测试,但肯定看起来这是一个bug,不管是在TThreadedQueue中还是在TMonitor中。 无论哪种方式,这都是RTL中的问题而不是您的代码。 您应该将此视为QC报告并使用上面的示例作为“如何重现”代码。


Mason,谢谢。除非其他人有不同的意见,否则我明天将进行质量控制。看起来错误出在TMonitor上。 - LU RD
7
QC #91246 TThreadedQueue在多个消费者下失败。如果您喜欢,请为其投票。 - LU RD
6
以下是QC报告链接:http://qc.embarcadero.com/wc/qcmain.aspx?d=91246。 - jachguate
3
似乎没有被固定 - 添加了一个社区维基回答,并对生成XE2中的AV的示例代码进行了微调 - Darian Miller
有人知道这个错误是如何被修复的,以及如何将修复应用于XE3吗? - Nick Hodges

10

1
我熟悉OmniThreadLibrary,也了解Andreas Hausladen的AsyncCalls。他的博客地址是http://andy.jgknet.de/blog/bugfix-units/asynccalls-29-asynchronous-function-calls/。 - LU RD

4

您的示例在XE2下似乎运行良好,但是如果我们填充队列,则在PushItem上失败并出现AV。(在XE2 Update1下测试)

要复制,请将任务创建从100增加到1100(您的队列深度设置为1024)

for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

每次在Windows 7上都会出现问题。我最初尝试了持续推送以进行压力测试,但它在第30个循环时失败......然后在第16个循环时失败......然后在65个循环时失败,因此在不同的间隔时间内它始终会在某个时刻失败。

  iLoop := 0;
  while iLoop < 1000 do
  begin
    Inc(iLoop);
    WriteLn('Loop: ' + IntToStr(iLoop));  
    for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
  end;

我偶然发现了Gurock的这个链接,它可以轻松地转换TThreadedQueue,避免了AV错误。http://blog.gurock.com/software/win32-condition-variables-and-monitors-for-delphi/ 将类型从TObject更改为Gurock版本:FQueueNotEmpty、FQueueNotFull更改为TCondition,FQueueLock更改为TLock;(并更改调用约定)我使用这些更改后无法重复TThreadedQueue中的错误。 - Darian Miller
1
@LURD - 我测试了博客上发布的最新更改,TThreadedQueue 仍然会崩溃。使用原始示例代码,添加一个执行上面 PushItem 代码的新 TThreadWriter,并创建多个写入线程。(基本上是进一步增强功率。)在 Chris Rolliston 的博客上留下了带有我的测试参数的评论。好消息是,Allen Bauer 在同一篇博客文章中发表评论,表示另一个修复程序将在下一个更新中推出。 - Darian Miller
1
Darian,太好了,我也复制了这个。不过我在Chris的博客上没有看到你的任何评论。希望EMB的更新能处理“多生产者/多消费者”的情况。 - LU RD
好的,现在我可以在博客中看到你的回答。 - LU RD
1
我使用XE2更新4进行了一些测试,无法再次破坏TThreadedQueue。尝试了多个生产者/多个消费者的许多组合。 - LU RD
显示剩余4条评论

3
我在D2009中寻找TThreadedQueue类,但似乎没有。我并不会因此而崩溃——Delphi线程支持一直以来都不是很好,我怀疑TThreadedQueue也不例外。
为什么要使用泛型的P-C(生产者/消费者)对象?一个简单的TObjectQueue子类就可以胜任——我已经使用了几十年,可以与多个生产者/消费者一起正常工作。
unit MinimalSemaphorePCqueue;

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.

The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.


Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued object.
}
interface

uses
  Windows, Messages, SysUtils, Classes,syncObjs,contnrs;


type

pObject=^Tobject;


TsemaphoreMailbox=class(TobjectQueue)
private
  countSema:Thandle;
protected
  access:TcriticalSection;
public
  property semaHandle:Thandle read countSema;
  constructor create; virtual;
  procedure push(aObject:Tobject); virtual;
  function pop(pResObject:pObject;timeout:DWORD):boolean;  virtual;
  function peek(pResObject:pObject):boolean;  virtual;
  destructor destroy; override;
end;


implementation

{ TsemaphoreMailbox }

constructor TsemaphoreMailbox.create;
begin
{$IFDEF D2009}
   inherited Create;
{$ELSE}
  inherited create;
{$ENDIF}
  access:=TcriticalSection.create;
  countSema:=createSemaphore(nil,0,maxInt,nil);
end;

destructor TsemaphoreMailbox.destroy;
begin
  access.free;
  closeHandle(countSema);
  inherited;
end;

function TsemaphoreMailbox.pop(pResObject: pObject;
  timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue.  If the queue is empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
  result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
  if result then // if a unit was supplied before the timeout,
  begin
    access.acquire;
    try
      pResObject^:=inherited pop; // get an object from the queue
    finally
      access.release;
    end;
  end;
end;

procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue.  If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
  access.acquire;
  try
    inherited push(aObject); // shove the object onto the queue
  finally
    access.release;
  end;
  releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;

function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
  access.acquire;
  try
    result:=(count>0);
    if result then pResObject^:=inherited pop; // get an object from the queue
  finally
    access.release;
  end;
end;
end.

谢谢你的回答。我在XE文档中看到了TThreadedQueue类,并为我所拥有的一个真实应用程序进行了简单的测试。这是我第一次尝试泛型,但结果并不理想。正如您从其他评论中所看到的那样,错误出现在TMonitor类中,如果有人构建并行多线程应用程序,则会产生影响。我的实现最终使用了一个简单的队列,通过关键部分来保护推送和弹出操作。 - LU RD

1

我认为TThreadedQueue不支持多个消费者。根据帮助文件,它是先进先出的。我的印象是有一个线程在推送,另一个线程(只有一个!)在弹出。


8
FIFO只是指队列如何被清空,这并不意味着只能有一个线程从队列中取出作业,特别是当它被称为ThreadedQueue时。 - The_Fox
2
它被称为ThreadedQueue,因为推入(push)和弹出(pop)可以在不同的线程中进行。在多线程世界中,没有什么是免费的,因此我认为如果支持多个生产者和/或消费者,文档中应该会提到。但是它没有被提到,所以我认为它不应该工作。 - Giel
3
这个队列由监视器保护。监视器本身必须在多线程环境中是安全的。如果这个队列不适合多个消费者使用,它至少应该抛出一个可以捕捉的异常。 - LU RD
1
@LU RD:请注意,在 TThreadedQueue<T>.PopItem 的末尾有一行代码 "TMonitor.Pulse(FQueueNotFull);"。它没有受到你提到的监视器的保护。我猜这可能是导致问题的原因。 - Giel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接