非Win Form C#应用程序中同步事件处理程序线程的执行

3
我正在运行一些时间和CPU密集型的进程(TimeExpensive类型),一个接一个地进行。主线程(A)在另一个线程(B)中异步启动TimeExpensive进程,并变为不活动状态。完成后,线程B会同步触发调用者的完成处理程序,并在线程B中启动下一个TimeExpensive进程。创建了一个新线程(C),但在启动C之后,B就结束了。因此,对于n个进程,创建n个线程,大多数时间它们不会共存。
有人可能希望以线性单线程方式实现它,但是TimeExpensive是由第三方实现的,当它运行时,它使用所有系统核心并运行数小时。
//This will run as console app 
class Program
{
    static void Main(string[] args)
    {
        new Program().StartJobs();
    }

    void StartJobs()
    {
        Main mainJob = new Main();
        mainJob.MainCompletionEvent += 
                     new Action<object, EventArgs>(mainJob_MainCompletionEvent);
        mainJob.Start();
    }

    void mainJob_MainCompletionEvent(object sender, EventArgs e)
    {
        //if(success) Environment.Exit(0);
    }

}

class Main
{
    int processCounter = 0;
    public event Action<object, EventArgs> MainCompletionEvent;
    public void Start()
    {
        //...do other important tasks here...
        processCounter++;
        TimeExpensive te = new TimeExpensive();
        te.CompletionEvent += new Action(TimeExpensive_CompletionHandler);
        Thread aThread = new Thread(te.Run);
        aThread.IsBackground = false;
        aThread.Name = "TimeExpensive Thread: " + processCounter;
        aThread.Start();
    }

    void TimeExpensive_CompletionHandler()
    {
        Console.WriteLine("current Thread Name: " + Thread.CurrentThread.Name);
        //Start another Process In Background if
        if (processCounter < 5)
        {
            Start();
        }
        else
        {
            Console.ReadKey();
            if (JobCompletionEvent != null)
                JobCompletionEvent(this, new EventArgs());
        }
    }
}

class TimeExpensive
{
    public event Action CompletionEvent;

    public void Run()
    {
        //doing time expensive task
        //...

        //when finish Notify completion Handler...
        if (CompletionEvent != null)
        {
            CompletionEvent();
        }
    }
}

//Output
current Thread Name: TimeExpensive Thread: 1
current Thread Name: TimeExpensive Thread: 2
current Thread Name: TimeExpensive Thread: 3
current Thread Name: TimeExpensive Thread: 4
current Thread Name: TimeExpensive Thread: 5

上面的实现模仿了我所描述的行为。让我感到困扰的是事件处理程序在下一个线程开始之前同步运行,在此期间,它正在执行许多其设计不适合的任务。不确定这是否好用,我是否能够在Thread B的完成处理程序中返回Thread A,或者我应该使用另一个delegate.BeginInvoke来开始事件处理程序的执行?我希望用简单而安全的方法做到这一点。非常感谢您的帮助。
附言:我阅读了很多文章,但没有人处理这种情况。编辑:添加静态主要函数以显示如何在控制台应用程序中启动此代码。请记住,人们也可以创建UI来启动“Main”作业。它肯定会创建BackgroundWorker线程来创建mainJob对象并运行它。谢谢!

它使用所有系统核心。它可能以不寻常的方式实现,但似乎正在执行许多有用的工作?我不认为在OnCompletion处理程序中执行工作是必要的邪恶-它只是另一个函数调用。我不明白为什么A、B、C需要使用不同的线程。为什么不能按顺序运行?无论如何,如果需要进行任何更改,我会放弃线程并提交A、B、C的池任务。 - Martin James
1
使用多核心是一个加分项,特别是当你有专用服务器时。耗时的进程由第三方实现异步运行。我只是使用一个dll。 - Imran Amjad
1
在另一个线程中运行TimeExpensive的另一个优点是间接地对TimeExpensive进行CancelAsync调用。终止正在运行的TE线程,特别是当它正在进行IO操作时是不安全的。 - Imran Amjad
这是一个控制台应用程序吗?如果是,您能展示一下您的Main方法吗? - Nick Butler
@NicholasButler,我刚刚添加了控制台应用程序代码。请查看我的编辑注释。 - Imran Amjad
2个回答

8

在 Thread B 的完成处理程序中,我是否可以返回到 Thread A?

不行,您没有准备好从一个线程调用另一个线程的管道。这种管道由 GUI 应用程序的主线程提供。出于必要,用户界面基本上是线程不安全的。UI 线程有几个实现细节支持此类封送。它在典型的 生产者/消费者 线程模型的实现中扮演消费者的角色。

这需要一个线程安全的队列和一个在消费者中读取队列的循环。您可能会认出这是 Windows GUI 应用程序中的消息队列。使用调用 GetMessage 读取通知并对其进行操作的消息循环。现在,调用的封送很简单,您只需将消息发布到消息队列中,UI 线程将其读取并执行请求。Winforms 中的 Control.BeginInvoke 和 WPF 中的 Dispatcher.BeginInvoke 实现了发布。

你肯定可以自己实现这个同步机制。.NET 4的BlockingCollection类使其变得容易。但是请记住,您必须从根本上改变线程A的执行方式。对于发布到队列中的请求保持响应很重要。这是BackgroundWorker等类试图解决的问题。请记住,GUI消息循环存在是因为它是必要的,UI不是线程安全的。控制台应用程序通常没有同样的负担,控制台是线程安全的。

1
+1 感谢指引我使用 BlockingCollection 并解答我的主要疑问。 - Imran Amjad

0
您所遇到的问题是由于正确处理线程的难度。我使用了Actor编写了您的示例代码:
type Actor<'a> = MailboxProcessor<'a>

type SupMsg = WaitForDone of AsyncReplyChannel<string>
type ProgramState = RunNumber of int * Actor<WorkerMsg> option
and WorkerMsg = Begin of Id * AsyncReplyChannel<string>
and Id = int

let startComputation () = Actor.Start(fun inbox ->
  async { 
    let! Begin(id, chan) = inbox.Receive()
    printfn "Running Computation"
    do! Async.Sleep(20) // zZz
    chan.Reply(sprintf "'%i is done!'" id) })

let sup () = Actor.Start(fun inbox ->
  let rec loop state =
    async {
      match state with
      | RunNumber(_, None) -> return! loop <| RunNumber(1, Some(startComputation ()))
      | RunNumber(run, Some(active)) ->
        let! completed = active.PostAndAsyncReply(fun chan -> Begin(run, chan))
        printfn "sup observed: %s" completed
        let active' = Some(startComputation ())
        if run <> 5 then return! loop <| RunNumber(run + 1, active')
        else return! isDone () }
  and isDone () =
    async {
      let! WaitForDone(chan) = inbox.Receive()
      return chan.Reply("all done") }

  loop <| RunNumber(0, None))

[<EntryPoint>]
let main args =
  printfn "%s" <| (sup ()).PostAndReply(fun chan -> WaitForDone(chan))
  0

输出结果为:

> main();;
Running Computation
sup observed: '1 is done!'
Running Computation
sup observed: '2 is done!'
Running Computation
sup observed: '3 is done!'
Running Computation
sup observed: '4 is done!'
Running Computation
sup observed: '5 is done!'
all done
val it : int = 0

正如您所看到的,跨线程通信变得轻而易举。如果您的库是第三方库,则可以轻松地将Async.Sleep(20)替换为对该库的调用。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接