后台线程循环和双向通信

5
我以前使用过BackgroundWorkerTask在后台执行操作,并在完成后将其发送回UI。我甚至使用了BackgroundWorkerReportProgress来创建一个无限循环(除了取消操作),以持续向UI线程发布内容。
但这次我需要更可控的场景: 后台线程不断轮询其他系统。通过Invoke,它可以向UI发送更新。但是UI如何向后台线程发送消息呢?例如更改设置等。
实际上我正在寻求最佳.NET实践,以具有以下特点的工作线程:
  • 在后台运行,不会阻塞UI
  • 可以向UI发送更新(InvokeDispatch
  • 在无限循环中运行,但可以以适当的方式暂停,恢复和停止
  • UI线程可以向后台线程发送更新的设置
在我的情况下,我仍在使用WinForms,但我想这应该没有关系吧?我稍后会将应用程序转换为WPF。
你建议哪种最佳实践?

有什么其他的建议(针对BrokenGlass)?SynchronizationContext呢? - ZoolWay
SynchronizationContext 可能可行,但那只是一个低级抽象。我的看法 是使用 TPL 和自定义任务调度程序。 - noseratio - open to work
2个回答

4
我会使用 TPL 和自定义任务调度程序来实现,类似于 Stephen Toub 的 StaTaskScheduler。这就是下面的 WorkerWithTaskScheduler 实现的内容。在这种情况下,工作线程也是一个任务调度程序,可以在其主循环中执行任意的 Task 项目(使用 ExecutePendingTasks),同时进行工作。
在工作线程的上下文中执行作为 TPL Task 包装的 lambda 表达式是向工作线程发送消息并获得结果的非常方便的方法。这可以通过同步方式使用 WorkerWithTaskScheduler.Run().Wait/Result 或异步方式使用 await WorkerWithTaskScheduler.Run() 来完成。请注意,ContinueExecutionWaitForPendingTasks 用于暂停/恢复/结束工作线程的主循环。我希望代码是不言自明的,但如果需要澄清任何内容,请告诉我。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21628490
{
    // Test
    class Program
    {
        static async Task DoWorkAsync()
        {
            Console.WriteLine("Initial thread: " + Thread.CurrentThread.ManagedThreadId);

            // the worker thread lambda
            Func<WorkerWithTaskScheduler<int>, int> workAction = (worker) =>
            {
                var result = 0;

                Console.WriteLine("Worker thread: " + Thread.CurrentThread.ManagedThreadId);

                while (worker.ContinueExecution)
                {
                    // observe cancellation
                    worker.Token.ThrowIfCancellationRequested();

                    // executed pending tasks scheduled with WorkerWithTaskScheduler.Run
                    worker.ExecutePendingTasks();

                    // do the work item
                    Thread.Sleep(200); // simulate work payload
                    result++;

                    Console.Write("\rDone so far: " + result);
                    if (result > 100)
                        break; // done after 100 items
                }
                return result;
            };

            try
            {
                // cancel in 30s
                var cts = new CancellationTokenSource(30000);
                // start the worker
                var worker = new WorkerWithTaskScheduler<int>(workAction, cts.Token);

                // pause upon Enter
                Console.WriteLine("\nPress Enter to pause...");
                Console.ReadLine();
                worker.WaitForPendingTasks = true;

                // resume upon Enter
                Console.WriteLine("\nPress Enter to resume...");
                Console.ReadLine();
                worker.WaitForPendingTasks = false;

                // send a "message", i.e. run a lambda inside the worker thread
                var response = await worker.Run(() =>
                {
                    // do something in the context of the worker thread
                    return Thread.CurrentThread.ManagedThreadId;
                }, cts.Token);
                Console.WriteLine("\nReply from Worker thread: " + response);

                // End upon Enter
                Console.WriteLine("\nPress Enter to stop...");
                Console.ReadLine();

                // worker.EndExecution() to get the result gracefully
                worker.ContinueExecution = false; // or worker.Cancel() to throw
                var result = await worker.WorkerTask;

                Console.Write("\nFinished, result: " + result);
            }
            catch (Exception ex)
            {
                while (ex is AggregateException)
                    ex = ex.InnerException;
                Console.WriteLine(ex.Message);
            }
        }

        static void Main(string[] args)
        {
            DoWorkAsync().Wait();
            Console.WriteLine("\nPress Enter to Exit.");
            Console.ReadLine();
        }
    }

    //
    // WorkerWithTaskScheduler
    //

    public class WorkerWithTaskScheduler<TResult> : TaskScheduler, IDisposable
    {
        readonly CancellationTokenSource _workerCts;
        Task<TResult> _workerTask;

        readonly BlockingCollection<Task> _pendingTasks;
        Thread _workerThread;

        volatile bool _continueExecution = true;
        volatile bool _waitForTasks = false;

        // start the main loop
        public WorkerWithTaskScheduler(
            Func<WorkerWithTaskScheduler<TResult>, TResult> executeMainLoop,
            CancellationToken token)
        {
            _pendingTasks = new BlockingCollection<Task>();
            _workerCts = CancellationTokenSource.CreateLinkedTokenSource(token);

            _workerTask = Task.Factory.StartNew<TResult>(() =>
            {
                _workerThread = Thread.CurrentThread;
                return executeMainLoop(this);
            }, _workerCts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }

        // a sample action for WorkerWithTaskScheduler constructor
        public static void ExecuteMainLoop(WorkerWithTaskScheduler<TResult> worker)
        {
            while (!worker.ContinueExecution)
            {
                worker.Token.ThrowIfCancellationRequested();
                worker.ExecutePendingTasks();
            }
        }

        // get the Task
        public Task<TResult> WorkerTask
        {
            get { return _workerTask; }
        }

        // get CancellationToken 
        public CancellationToken Token
        {
            get { return _workerCts.Token; }
        }

        // check/set if the main loop should continue
        public bool ContinueExecution
        {
            get { return _continueExecution; }
            set { _continueExecution = value; }
        }

        // request cancellation
        public void Cancel()
        {
            _workerCts.Cancel();
        }

        // check if we're on the correct thread
        public void VerifyWorkerThread()
        {
            if (Thread.CurrentThread != _workerThread)
                throw new InvalidOperationException("Invalid thread.");
        }

        // check if the worker task itself is still alive
        public void VerifyWorkerTask()
        {
            if (_workerTask == null || _workerTask.IsCompleted)
                throw new InvalidOperationException("The worker thread has ended.");
        }

        // make ExecutePendingTasks block or not block
        public bool WaitForPendingTasks
        {
            get { return _waitForTasks; }
            set 
            { 
                _waitForTasks = value;
                if (value) // wake it up
                    Run(() => { }, this.Token);
            }
        }

        // execute all pending tasks and return
        public void ExecutePendingTasks()
        {
            VerifyWorkerThread();

            while (this.ContinueExecution)
            {
                this.Token.ThrowIfCancellationRequested();

                Task item;
                if (_waitForTasks)
                {
                    item = _pendingTasks.Take(this.Token);
                }
                else
                {
                    if (!_pendingTasks.TryTake(out item))
                        break;
                }

                TryExecuteTask(item);
            }
        }

        //
        // TaskScheduler methods
        //

        protected override void QueueTask(Task task)
        {
            _pendingTasks.Add(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _pendingTasks.ToArray();
        }

        protected override bool TryExecuteTaskInline(
            Task task, bool taskWasPreviouslyQueued)
        {
            return _workerThread == Thread.CurrentThread &&
                TryExecuteTask(task);
        }

        public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }

        public void Dispose()
        {
            if (_workerTask != null)
            {
                _workerCts.Cancel();
                _workerTask.Wait();
                _pendingTasks.Dispose();
                _workerTask = null;
            }
        }

        //
        // Task.Factory.StartNew wrappers using this task scheduler
        //

        public Task Run(Action action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task<T> Run<T>(Func<T> action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task Run(Func<Task> action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }

        public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }
    }
}

要实现从工作线程到客户端的通知,您可以使用IProgress<T>模式此示例)。

这看起来非常有趣。我会去看看的。谢谢你分享这么多信息!我可以问一下你编辑了什么吗? - ZoolWay

1

我脑海中首先想到的,也是最干净的方法是将不断运行的后台线程方法作为类的实例方法。这个类实例可以公开属性/方法,允许其他人改变状态(例如通过UI)- 由于从不同的线程读取/更新状态,可能需要一些锁定。


你的意思是在主线程中创建一个 threadWorkerInstance 实例的新线程,并通过 RunMethod 方法对其进行操作,对吗? - ZoolWay
没错,一定要记得放置锁以处理并发。 - BrokenGlass

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接