多线程访问数据

3
我有一个应用程序,有三个线程。线程1、2和3。
还有两个数字列表1和2。
线程1将数据插入到列表1中。
线程2从列表1获取数据。
线程2将数据插入到列表2中。
线程3从列表2获取数据。
如果线程同时运行会占用大量CPU。在填充列表2的数据时不需要运行第二个和第三个线程。
如何通知处于列表中的线程新数据已插入,并且需要开始对新数据进行处理?
有什么技术和高效的方法可以实现?
非常感谢。

你需要一个带有信号量的有界缓冲区,你可以在Martin Harvey的出色线程教程中找到一个例子,链接在这里:http://cc.embarcadero.com/Item.aspx?id=14809 - whosrdaddy
4
那是一条管道。获取 OTL 并查看管道示例。 - David Heffernan
正如大卫所说,OTL可能会对你有帮助:http://otl.17slon.com/book/doku.php?id=book:highlevel:pipeline - gabr
2个回答

2
这是一个示例,可以从三个线程中添加list1和list2中的值。
每次将值放入列表中时,都会触发一个事件,并且处理此事件的线程会提取列表中的最后一个值并清除事件标志。
在事件标志被清除之前,不能向列表中放入新值。
中间线程对新值进行中间存储,以避免阻塞第一个线程。
所有事件都可以等待,因此使CPU处于轻松状态。
这些列表是线程安全的。
program Project62;

{$APPTYPE CONSOLE}

uses
  System.SysUtils,
  System.Classes,
  System.SyncObjs,
  System.Generics.Collections;

Type
  TMyThread1 = Class(TThread)
    private
      fMySyncAddList : TSimpleEvent;
      fMyList : TThreadList<Integer>;
      fAddVal : Integer;
    public
      Constructor Create(ASyncAddList: TSimpleEvent; AList: TThreadList<Integer>);
      procedure Execute; override;
  End;
  TMyThread2 = Class(TThread)
    private
      fMySyncAddList1,fMySyncAddList2 : TSimpleEvent;
      fMyList1,fMyList2 : TThreadList<Integer>;
      fAddVal : Integer;
    public
      Constructor Create(ASyncAddList1,ASyncAddList2: TSimpleEvent; AList1,AList2 : TThreadList<Integer>);
      procedure Execute; override;
  End;
  TMyThread3 = Class(TThread)
    private
      fMySyncAddList2 : TSimpleEvent;
      fMyList2 : TThreadList<Integer>;
      fAddVal : Integer;
    public
      Constructor Create(ASyncAddList2: TSimpleEvent; AList2 : TThreadList<Integer>);
      procedure Execute; override;
  End;


{ TMyThread1 }

constructor TMyThread1.Create( ASyncAddList : TSimpleEvent; AList: TThreadList<Integer>);
begin
  Inherited Create(false);
  fMySyncAddList := AsyncAddList;
  fMyList := AList;

end;

procedure TMyThread1.Execute;
var
  stateAcknowledged : boolean;
begin
  stateAcknowledged := true;
  while (not Terminated) do
  begin
    if stateAcknowledged then
    begin // Do some work and adda a value to list1
      fAddVal := Random(100);
      fMyList.Add(fAddVal);
      fMySyncAddList.SetEvent; // Signal a new addition
      stateAcknowledged := false;
      //ShowVal;
      Sleep(1000);
    end
    else begin
      stateAcknowledged := (fMySyncAddList.WaitFor(100) <> wrSignaled);
    end;
  end;
end;

{ TMyThread2 }

constructor TMyThread2.Create(ASyncAddList1, ASyncAddList2: TSimpleEvent;
  AList1, AList2: TThreadList<Integer>);
begin
  Inherited Create(false);
  fMySyncAddList1 := AsyncAddList1;
  fMySyncAddList2 := AsyncAddList2;
  fMyList1 := AList1;
  fMyList2 := AList2;    
end;

procedure TMyThread2.Execute;
var
  wr : TWaitResult;
  list : TList<Integer>;
  pulled : Boolean;
begin
  pulled := false;
  while (not Terminated) do
  begin
    if pulled then // Add a value to list2
    begin
      wr := fMySyncAddList2.WaitFor(0);
      if (wr <> wrSignaled) then
      begin
        fMyList2.Add(fAddVal);
        fMySyncAddList2.SetEvent; // Signal a new addition
        pulled := false;
      end
      else Sleep(100);
    end
    else begin // Wait for a new value in list1
      wr := fMySyncAddList1.WaitFor(INFINITE);
      if Terminated then
        Exit;
      if (wr = wrSignaled) then
      begin
        // Pull out the value
        list := fMyList1.LockList;
        try
          fAddVal := list.Last;
        finally
          fMyList1.UnlockList;
        end;
        // All clear
        pulled := true;
        fMySyncAddList1.ResetEvent;
        //ShowVal;
      end;
    end;
  end;
end;

{ TMyThread3 }

constructor TMyThread3.Create(ASyncAddList2: TSimpleEvent;
  AList2: TThreadList<Integer>);
begin
  Inherited Create(false);
  fMySyncAddList2 := AsyncAddList2;
  fMyList2 := AList2;
end;

procedure TMyThread3.Execute;
var
  wr : TWaitResult;
  list : TList<Integer>;
begin
  while not Terminated do
  begin
    wr := fMySyncAddList2.WaitFor(INFINITE);
    if Terminated then
      Exit;
    if (wr = wrSignaled)  then // Wait for signal
    begin
      // Pull out the value
      list := fMyList2.LockList;
      try
        fAddVal := list.Last;
        //ShowVal;
      finally
        fMyList2.UnlockList;
      end;
      // Clear event
      fMySyncAddList2.ResetEvent;
    end;
  end;
end;

var
  list1,list2 : TThreadList<Integer>;
  syncList1,syncList2 : TSimpleEvent;
  thread1 : TMyThread1;
  thread2 : TMyThread2;
  thread3 : TMyThread3;
begin
  list1 := TThreadList<Integer>.Create;
  list2 := TThreadList<Integer>.Create;
  syncList1 := TSimpleEvent.Create(Nil,True,False,'',false);
  syncList2 := TSimpleEvent.Create(Nil,True,False,'',false);
  thread3 := TMyThread3.Create(syncList2,list2);
  thread2 := TMyThread2.Create(syncList1,syncList2,list1,list2);
  thread1 := TMyThread1.Create(syncList1,list1);
  Try
    WriteLn('Press [Enter] key to stop.');
    ReadLn;

  Finally
    thread3.Terminate;
    syncList2.SetEvent; // Wake up call
    thread3.Free;
    thread2.Terminate;
    syncList1.SetEvent; // Wake up call
    thread2.Free;
    thread1.Free;
    syncList1.Free;
    syncList2.Free;
    list1.Free;
    list2.Free;
  End;
end.

添加了一个示例,其中两个TThreadedQueue在线程之间传递信息。线程保留整数的内部列表。正如@DavidHeffernan指出的那样,代码要简单得多。

program Project63;

{$APPTYPE CONSOLE}

uses
  System.SysUtils,
  System.Classes,
  System.SyncObjs,
  System.Generics.Collections;

Type
  TMyThread1 = Class(TThread)
    private
      fMyList : TList<Integer>;
      fMyQueue : TThreadedQueue<Integer>;
      fAddVal : Integer;
    public
      Constructor Create(AQueue : TThreadedQueue<Integer>);
      procedure Execute; override;
  End;
  TMyThread2 = Class(TThread)
    private
      fMyList1,fMyList2 : TList<Integer>;
      fMyQueue1,fMyQueue2 : TThreadedQueue<Integer>;
      fAddVal : Integer;
    public
      Constructor Create(AQueue1,AQueue2: TThreadedQueue<Integer>);
      procedure Execute; override;
  End;
  TMyThread3 = Class(TThread)
    private
      fMyList : TList<Integer>;
      fMyQueue : TThreadedQueue<Integer>;
      fAddVal : Integer;
    public
      Constructor Create(AQueue : TThreadedQueue<Integer>);
      procedure Execute; override;
  End;

constructor TMyThread1.Create( AQueue : TThreadedQueue<Integer>);
begin
  Inherited Create(false);
  fMyQueue:= AQueue;
  fMyList := TList<Integer>.Create;
end;

procedure TMyThread1.Execute;
begin
  while (not Terminated) do
  begin
    Sleep(1000); // Simulate some work
    fAddVal := Random(100);
    fMyList.Add(fAddVal);
    fMyQueue.PushItem(fAddVal); // Signal a new addition
  end;
  fMyList.Free;
end;

constructor TMyThread2.Create(AQueue1,AQueue2: TThreadedQueue<Integer>);
begin
  Inherited Create(false);
  fMyQueue1 := AQueue1;
  fMyQueue2 := AQueue2;
  fMyList1 := TList<Integer>.Create;
  fMyList2 := TList<Integer>.Create;
end;

procedure TMyThread2.Execute;
var
  queueSize : Integer;
begin
  while (not Terminated) do
  begin
    if (fMyQueue1.PopItem(queueSize,fAddVal) = wrSignaled) and 
       (not Terminated) then
    begin
      fMyList1.Add(fAddVal);
      // Do some work and send a new value to next thread
      fMyQueue2.PushItem(fAddVal);
      fMyList2.Add(fAddVal);
      
    end;
  end;
  fMyList1.Free;
  fMyList2.Free;
end;

constructor TMyThread3.Create(AQueue : TThreadedQueue<Integer>);
begin
  Inherited Create(false);
  fMyQueue := AQueue;
  fMyList := TList<Integer>.Create;
end;

procedure TMyThread3.Execute;
var
  queueSize : Integer;
begin
  while not Terminated do
  begin
    if (fMyQueue.PopItem(queueSize,fAddVal) = wrSignaled) and 
       (not Terminated) then
    begin
      fMyList.Add(fAddVal);
      // Do some work on list
      
    end;
  end;
  fMyList.Free;
end;

var
  queue1,queue2 : TThreadedQueue<Integer>;
  thread1 : TMyThread1;
  thread2 : TMyThread2;
  thread3 : TMyThread3;
begin
  queue1 := TThreadedQueue<Integer>.Create;
  queue2 := TThreadedQueue<Integer>.Create;
  thread3 := TMyThread3.Create(queue2);
  thread2 := TMyThread2.Create(queue1,queue2);
  thread1 := TMyThread1.Create(queue1);
  Try
    WriteLn('Press [Enter] key to stop.');
    ReadLn;

  Finally
    thread3.Terminate;
    queue2.PushItem(0); // Wake up call
    thread3.Free;
    thread2.Terminate;
    queue1.PushItem(0); // Wake up call
    thread2.Free;
    thread1.Free;
    queue1.Free;
    queue2.Free;
  End;
end.

非常底层。你只需要构建一个线程安全队列,并实例化两个即可。 - David Heffernan
每秒钟醒来10次?为什么? - David Heffernan
消费者可以拉取数据并维护自己的私有列表,其中包含数据的副本。这段代码比实际需要的长度长10倍,并且使用了恶劣的睡眠超时。在管道中,您永远不应该使用sleep或超时。事实上,应该始终避免使用sleep。使用一对线程安全队列就可以轻松解决。 - David Heffernan
@DavidHeffernan,无限睡眠很糟糕吗?制作副本可能有效,但缺少它们如何被处理的细节。 - LU RD
1
复制是很好的,因为它隔离了数据并且免除了同步的需要。共享是大多数线程复杂性的根源。 - David Heffernan
显示剩余2条评论

1

首先想到的是信号量。信号量是一种带有计数器的同步原语:一个线程尝试将计数器减少,如果为零,则该线程被阻塞(即不被调度并且不占用CPU,只是无害地等待)。另一个线程增加计数器,导致被阻塞的线程继续执行。

您可以为每个列表设置一个信号量,因此在读取之前,消费线程会将信号量减少,而生产者线程在写入后会将其增加。

还有另一件事需要注意:当另一个线程正在修改列表时,是否允许另一个线程从列表中获取数据?这取决于如何实现列表(或集合),因此请在文档中查找关于列表“线程安全性”的内容。也许您需要另一个同步原语:互斥锁。线程获取并释放互斥锁,当多个线程尝试锁定互斥锁时,其中一个必须等待。在此处,每个列表使用一个互斥锁,并在持有此互斥锁的情况下进行修改和读取可能是适当的。


1
生产者线程在写入后增加它,不是之前。 - Martin James

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接