如何使用新线程库中的TTask.WaitForAny?

7

为了尝试在Delphi中使用线程库并行计算任务,并使用TTask.WaitForAny()获取第一个计算结果,有时会发生异常导致执行停止。

异常时的调用堆栈:

在$752D2F71处出现首次机会异常。异常类EMonitorLockException,消息为“Object lock not owned”。进程Project1.exe (11248)

:752d2f71 KERNELBASE.RaiseException + 0x48
System.TMonitor.CheckOwningThread
System.ErrorAt(25,$408C70)
System.Error(reMonitorNotLocked)
System.TMonitor.CheckOwningThread
System.TMonitor.Exit
System.TMonitor.Exit($2180E40)
System.Threading.TTask.RemoveCompleteEvent(???)
System.Threading.TTask.DoWaitForAny((...),4294967295)
System.Threading.TTask.WaitForAny((...))
Project9.Parallel2
Project9.Project1
:74ff919f KERNEL32.BaseThreadInitThunk + 0xe
:7723b54f ntdll.RtlInitializeExceptionChain + 0x8f
:7723b51a ntdll.RtlInitializeExceptionChain + 0x5a

调用栈表明异常是由线程库中的一个 bug 或者 TMonitor 和/或 TTask.WaitForAny() 引起的。为了验证这一点,代码被简化到最小限度:

program Project1;

{$APPTYPE CONSOLE}

uses
  System.SysUtils, System.Threading, System.Classes, System.SyncObjs,
  System.StrUtils;
var
  WorkerCount : integer = 1000;

function MyTaskProc: TProc;
begin
  result := procedure
    begin
      // Do something
    end;
end;

procedure Parallel2;
var
  i : Integer;
  Ticks: Cardinal;
  tasks: array of ITask;
  LTask: ITask;
  workProc: TProc;
begin
  workProc := MyTaskProc();
  Ticks := TThread.GetTickCount;
  SetLength(tasks, WorkerCount); // number of parallel tasks to undertake
  for i := 0 to WorkerCount - 1 do // parallel tasks
    tasks[i] := TTask.Run(workProc);
  TTask.WaitForAny(tasks); // wait for the first one to finish
  for LTask in tasks do
    LTask.Cancel; // kill the remaining tasks
  Ticks := TThread.GetTickCount - Ticks;
  WriteLn('Parallel time ' + Ticks.ToString + ' ms');
end;

begin
  try
    repeat
      Parallel2;
      WriteLn('finished');
    until FALSE;
  except
    on E: Exception do
      writeln(E.ClassName, ': ', E.Message);
  end;
  Readln;
end.

现在,该错误在一段时间后就会重现,RTL错误已经得到确认。

这被提交到Embarcadero作为RSP-10197 TTask.WaitForAny gives exception EMonitorLockException "Object lock not owned"


考虑到目前使用Delphi线程库无法解决此问题,问题是:

是否有一种解决方法可以并行执行一个过程以获得第一个获得的解决方案?


2
@JimmyDean 你不应该重新提问 - 你应该回到之前的问题并用新信息进行编辑 - J...
1
此外,这种类型的问题(例如:“我的代码是否已经优化好?请检查。”)完全不属于讨论范围。除了极少数例外情况,代码几乎总是可以进一步优化的,这个问题非常开放式的。在这种情况下,代码已经非常混乱,因此全面回答这个问题将是一个非常巨大的任务。你在上一个问题中获得了丰富的建议,但你的“修订”代码似乎忽略了很多或大部分建议。从小事做起,逐步学习每一个细节——没有计划地跳入一个复杂的问题永远不会有好结果。 - J...
1
我仍然看到这段代码中存在许多违规行为,而你在上一次提问的迭代中已经被告知要避免这些问题。具体来说,有大量全局变量、从已知不支持多线程的函数中调用函数以及可怕的缩进。现在谈论优化这段代码还为时过早。优化只对正确的代码有意义,而你的代码还远远没有达到正确的水平。 - Rob Kennedy
1
@JimmyDean,与你认为这个网站的规则“短视和琐碎”相反:这些规则是多年来演变而来的,旨在保持这个网站作为一个高质量的问答参考。你提出的问题不符合该网站的目标,你试图创建只有微小差异的重复问题也不符合该网站的目标。如果你遵循该网站的指南,我相信你会发现它是一个非常有价值的资源。随着时间的推移,你会明白为什么一些看似严厉的规则存在。 - Disillusioned
2
@DavidHeffernan,谢谢。我将代码削减到最小可重现的示例。我会提交一个QC。显然,TMonitor或TTask.WaitForAny中存在错误。 - LU RD
显示剩余20条评论
1个回答

4

这里有一个示例,使用TParallel.For来在产生答案时停止执行。它使用TParallel.LoopState来通知并停止并行循环的其他成员。通过使用.Stop信号,所有当前和挂起的迭代都应该停止。当前迭代应检查loopState.Stopped

procedure Parallel3(CS: TCriticalSection);
var
  Ticks: Cardinal;
  i,ix: Integer;  // variables that are only touched once in the Parallel.For loop
begin
  i := 0;
  Ticks := TThread.GetTickCount;
  TParallel.For(1,WorkerCount,
    procedure(index:Integer; loopState: TParallel.TLoopState)
    var
      k,l,m: Integer;
    begin
      // Do something complex
      k := (1000 - index)*1000;
      for l := 0 to Pred(k) do
        m := k div 1000;
      // If criteria to stop fulfilled:
      CS.Enter;
      Try
        if loopState.Stopped then // A solution was already found
          Exit;
        loopState.Stop;  // Signal 
        Inc(i);
        ix := index;
      Finally
        CS.Leave;
      End;
    end
  );
  Ticks := TThread.GetTickCount - Ticks;
  WriteLn('Parallel time ' + Ticks.ToString + ' ticks', ' i :',i,' index:',ix);
end;

临界区保护计算结果,这里为了简单起见是 i, ix。
免责声明:考虑到 System.Threading 库中存在大量的错误,我建议使用 OTL 框架进行另一种解决方案。至少在库达到稳定基础之前如此。

我正在尝试使用OTL框架来复现您提出的解决方案。在OTL中,LoopState的等效物是什么?我是否需要使用Stopped和cancellationToken的组合?谢谢。 - Jimmy Dean
使用cancelToken.Signal来设置,使用cancelToken.IsSignaled来测试。Parallel.ForEach(1,WorkerCount) .CancelWith(cancelToken) .Execute(procedure( const index: integer) begin ... end); - LU RD

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接