线程同步队列的最佳实现方法

3

我有一个队列,可以将不同的线程入队,因此我可以保证两件事情:

  1. 请求按顺序一个一个处理。
  2. 请求按照到达的顺序进行处理。

第二点非常重要。否则,仅使用简单的关键段就足够了。 我有不同组的请求,只有在单个组内才需要满足这些条件。来自不同组的请求可以并发运行。

它看起来像这样:

FTaskQueue.Enqueu('MyGroup');
try
  Do Something (running in context of some thread)
finally
  FTaskQueue.Dequeu('MyGroup');
end;

编辑: 我已经删除了实际的实现,因为它隐藏了我想要解决的问题。

我需要这个功能是因为我有一个基于Indy的Web服务器,可以接受HTTP请求。首先,我找到请求对应的会话。然后执行该会话的请求(代码)。我可能会收到同一会话的多个请求(即在第一个请求仍在处理时可能会收到新请求),它们必须按照正确的到达顺序逐一执行。因此,我需要一个通用的同步队列,以便在这种情况下可以将请求排队。我无法控制线程,每个请求可能在不同的线程中执行。

这种情况下最佳(常见)的方法是什么?问题在于Enqueue和Dequeue必须是原子操作,以保留正确的顺序。我的当前实现存在实质性瓶颈,但它可以工作。

编辑: 下面是原子Enqueue / Dequeue操作的问题

你通常会这样做:

procedure Enqueue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoEnqueue;
  finally 
    LeaveCriticalSection(FCritSec);
  end;

  BlockTheCurrentThread; // here the thread blocks itself
end;

procedure Dequeue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoDequeue;
    UnblockTheNextThread; // here the thread unblocks another thread
  finally 
    LeaveCriticalSection(FCritSec);
  end;
end;

现在的问题是这不是原子操作。如果已经有一个线程在队列中,另一个线程调用Enqueue,第二个线程会离开临界区并尝试阻塞自己。现在线程调度程序将恢复第一个线程,它将尝试解除下一个(第二个)线程的阻塞。但第二个线程尚未被阻塞,因此什么也不会发生。现在第二个线程继续并阻塞自己,但这是不正确的,因为它将无法解除阻塞。如果阻塞在临界区内,则临界区永远不会离开,我们就会出现死锁。


如果您有多个线程相互挂起和恢复,以确保任何给定时间只有一个线程执行,则应该意识到您的整个设计是错误的。在这种情况下,请求不应等同于线程。 - mghie
我怀疑这种方法不正确。但是假设我有一个Indy服务器。我在Web服务器事件处理程序中收到HTTP请求。我通过哈希表找到一个会话,然后执行请求=为该会话执行一些代码。现在,如果对于同一个会话,我收到多个请求,它们必须一个接一个地执行。而且每个请求都在不同的线程上下文中。我无法控制线程。如果您知道更好的方法,请将其写成答案。 - Runner
我明白了,但是这些限制在你的问题中并不明显。如果你只阐述问题并询问解决方法,那么你会得到更好的答案。请删除自己解决方案的细节。 - mghie
3个回答

9
另一种方法:
让每个请求线程都有一个手动重置的事件,最初未设置。队列管理器是一个简单的对象,它维护一个线程安全的此类事件列表。Enqueue()和Dequeue()方法都将请求线程的事件作为参数。
type
  TRequestManager = class(TObject)
  strict private
    fCritSect: TCriticalSection;
    fEvents: TList<TEvent>;
  public
    constructor Create;
    destructor Destroy; override;

    procedure Enqueue(ARequestEvent: TEvent);
    procedure Dequeue(ARequestEvent: TEvent);
  end;

{ TRequestManager }

constructor TRequestManager.Create;
begin
  inherited Create;
  fCritSect := TCriticalSection.Create;
  fEvents := TList<TEvent>.Create;
end;

destructor TRequestManager.Destroy;
begin
  Assert((fEvents = nil) or (fEvents.Count = 0));
  FreeAndNil(fEvents);
  FreeAndNil(fCritSect);
  inherited;
end;

procedure TRequestManager.Dequeue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(fEvents.Count > 0);
    Assert(fEvents[0] = ARequestEvent);
    fEvents.Delete(0);
    if fEvents.Count > 0 then
      fEvents[0].SetEvent;
  finally
    fCritSect.Release;
  end;
end;

procedure TRequestManager.Enqueue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(ARequestEvent <> nil);
    if fEvents.Count = 0 then
      ARequestEvent.SetEvent
    else
      ARequestEvent.ResetEvent;
    fEvents.Add(ARequestEvent);
  finally
    fCritSect.Release;
  end;
end;

每个请求线程在队列管理器上调用Enqueue(),然后等待自己的事件被触发。然后它处理请求并调用Dequeue():
{ TRequestThread }

type
  TRequestThread = class(TThread)
  strict private
    fEvent: TEvent;
    fManager: TRequestManager;
  protected
    procedure Execute; override;
  public
    constructor Create(AManager: TRequestManager);
  end;

constructor TRequestThread.Create(AManager: TRequestManager);
begin
  Assert(AManager <> nil);
  inherited Create(TRUE);
  fEvent := TEvent.Create(nil, TRUE, FALSE, '');
  fManager := AManager;
  Resume;
end;

procedure TRequestThread.Execute;
begin
  fManager.Enqueue(fEvent);
  try
    fEvent.WaitFor(INFINITE);
    OutputDebugString('Processing request');
    Sleep(1000);
    OutputDebugString('Request processed');
  finally
    fManager.Dequeue(fEvent);
  end;
end;

{ TForm1 }

procedure TForm1.Button1Click(Sender: TObject);
var
  i: integer;
begin
  for i := 1 to 10 do
    TRequestThread.Create(fRequestManager);
end;

队列管理器在Enqueue()Dequeue()中都锁定事件列表。如果Enqueue()中的列表为空,则设置参数中的事件,否则将其重置。然后将事件附加到列表中。因此,第一个线程可以继续请求,所有其他线程都将被阻塞。在Dequeue()中,事件从列表顶部移除,并设置下一个事件(如果有的话)。
这样,最后一个请求线程将导致下一个请求线程解除阻塞,完全不需要挂起或恢复线程。此解决方案也不需要任何额外的线程或窗口,每个请求线程只需一个事件对象即可。

这正是我最初使用的方法。唯一的区别是,我使用了Suspend / Resume而不是事件,这是次优的(我的方法更好)。但是看看你遇到的问题(我更新了问题)。操作必须是原子的。这就是为什么我引入了额外的线程,以便所有操作都来自另一个线程,并且它们完全是原子的,没有任何关键部分。但是,这样会引入实质性的瓶颈。如果您能解决此问题,那么我将接受您的答案,因为您的答案中其他所有内容已经是最佳的。 - Runner
现在有两个选择。要么我没有看到什么,解决方案很简单,要么我完全在错误的地方寻找答案 :) - Runner
啊,你贴出了代码。是的,我认为这应该可以工作。我知道它很简单,但我错过了某些东西 :) 我会尝试以这种方式实现它。如果它经得起考验,我相信它会的话,那么我将接受你的答案。我不认为它可以做得比这更好。 - Runner
我一定是漏掉了什么。确实有时会有多个请求线程运行,但重要的部分(请求处理)是串行化的。我也看不到任何死锁的风险? - mghie
是的,区别在于事件与挂起/恢复方法。当调用Suspend时,您会立即被挂起,从而出现我所描述的问题。但是,使用事件的方法可以先发出信号并确保它是原子性的,然后再调用WaitFor。这就是我一直缺少的东西。我太专注于我的问题,没有看到这个解决方案。 - Runner
运行得非常好。基本上这就是我一开始想做的,唯一的例外是它能够正确地工作。感谢您的所有帮助。 - Runner

2
我会考虑你在评论中提供的额外信息后进行回答。
如果您有多个需要串行化的线程,则可以利用Windows免费提供的序列化机制。让每个队列成为具有自己窗口和标准消息循环的线程。使用SendMessage()而不是PostThreadMessage(),Windows将负责阻塞发送线程,直到消息被处理,并确保维护正确的执行顺序。通过为每个请求组使用具有自己窗口的线程,您可以确保同时处理多个组。
这是一个简单的解决方案,仅在请求本身可以在与其起源上下文不同的线程上处理时才有效,在许多情况下不应该是问题。

1
请注意,这是将方法调用序列化为STA COM对象所使用的相同原理。 - mghie
1
是的,这可能是一个不错的解决方案,引入一个新线程来监听消息也不是问题。唯一的缺点(除了虚拟窗口)是,这样我需要另一个线程池。我已经从Indy服务器获得了非常好的线程,但这可能是唯一可行的解决方案。让我们等待看看是否有其他方法。 - Runner
请注意,由于您已经有了TTaskQueueThread,因此不需要新线程。您只需让它拥有一个窗口并使用阻塞消息处理即可。 - mghie

0
你尝试过 Delphi 提供的 TThreadList 对象吗?
它是线程安全的,可以为你管理锁。你在主线程之外管理列表,在主线程内部进行操作。
当请求需要新任务时,将其添加到列表中。当一个线程完成时,通过 OnTerminate 事件调用列表中的下一个线程。

是的,我知道TThreadList。问题不在于线程安全列表,我已经解决了那个问题。问题在于如何正确同步FIFO队列。mghie提供的答案完美地解决了这个问题。我的问题在于采用了错误的方法,挂起/恢复线程,而不是使用事件。 - Runner

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接