我会使用
TPL 和自定义任务调度程序来实现,类似于 Stephen Toub 的
StaTaskScheduler
。这就是下面的
WorkerWithTaskScheduler
实现的内容。在这种情况下,工作线程也是一个任务调度程序,可以在其主循环中执行任意的
Task
项目(使用
ExecutePendingTasks
),同时进行工作。
在工作线程的上下文中执行作为 TPL
Task
包装的 lambda 表达式是向工作线程发送消息并获得结果的非常方便的方法。这可以通过同步方式使用
WorkerWithTaskScheduler.Run().Wait/Result
或异步方式使用
await WorkerWithTaskScheduler.Run()
来完成。请注意,
ContinueExecution
和
WaitForPendingTasks
用于暂停/恢复/结束工作线程的主循环。我希望代码是不言自明的,但如果需要澄清任何内容,请告诉我。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Console_21628490
{
class Program
{
static async Task DoWorkAsync()
{
Console.WriteLine("Initial thread: " + Thread.CurrentThread.ManagedThreadId);
Func<WorkerWithTaskScheduler<int>, int> workAction = (worker) =>
{
var result = 0;
Console.WriteLine("Worker thread: " + Thread.CurrentThread.ManagedThreadId);
while (worker.ContinueExecution)
{
worker.Token.ThrowIfCancellationRequested();
worker.ExecutePendingTasks();
Thread.Sleep(200);
result++;
Console.Write("\rDone so far: " + result);
if (result > 100)
break;
}
return result;
};
try
{
var cts = new CancellationTokenSource(30000);
var worker = new WorkerWithTaskScheduler<int>(workAction, cts.Token);
Console.WriteLine("\nPress Enter to pause...");
Console.ReadLine();
worker.WaitForPendingTasks = true;
Console.WriteLine("\nPress Enter to resume...");
Console.ReadLine();
worker.WaitForPendingTasks = false;
var response = await worker.Run(() =>
{
return Thread.CurrentThread.ManagedThreadId;
}, cts.Token);
Console.WriteLine("\nReply from Worker thread: " + response);
Console.WriteLine("\nPress Enter to stop...");
Console.ReadLine();
worker.ContinueExecution = false;
var result = await worker.WorkerTask;
Console.Write("\nFinished, result: " + result);
}
catch (Exception ex)
{
while (ex is AggregateException)
ex = ex.InnerException;
Console.WriteLine(ex.Message);
}
}
static void Main(string[] args)
{
DoWorkAsync().Wait();
Console.WriteLine("\nPress Enter to Exit.");
Console.ReadLine();
}
}
public class WorkerWithTaskScheduler<TResult> : TaskScheduler, IDisposable
{
readonly CancellationTokenSource _workerCts;
Task<TResult> _workerTask;
readonly BlockingCollection<Task> _pendingTasks;
Thread _workerThread;
volatile bool _continueExecution = true;
volatile bool _waitForTasks = false;
public WorkerWithTaskScheduler(
Func<WorkerWithTaskScheduler<TResult>, TResult> executeMainLoop,
CancellationToken token)
{
_pendingTasks = new BlockingCollection<Task>();
_workerCts = CancellationTokenSource.CreateLinkedTokenSource(token);
_workerTask = Task.Factory.StartNew<TResult>(() =>
{
_workerThread = Thread.CurrentThread;
return executeMainLoop(this);
}, _workerCts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
public static void ExecuteMainLoop(WorkerWithTaskScheduler<TResult> worker)
{
while (!worker.ContinueExecution)
{
worker.Token.ThrowIfCancellationRequested();
worker.ExecutePendingTasks();
}
}
public Task<TResult> WorkerTask
{
get { return _workerTask; }
}
public CancellationToken Token
{
get { return _workerCts.Token; }
}
public bool ContinueExecution
{
get { return _continueExecution; }
set { _continueExecution = value; }
}
public void Cancel()
{
_workerCts.Cancel();
}
public void VerifyWorkerThread()
{
if (Thread.CurrentThread != _workerThread)
throw new InvalidOperationException("Invalid thread.");
}
public void VerifyWorkerTask()
{
if (_workerTask == null || _workerTask.IsCompleted)
throw new InvalidOperationException("The worker thread has ended.");
}
public bool WaitForPendingTasks
{
get { return _waitForTasks; }
set
{
_waitForTasks = value;
if (value)
Run(() => { }, this.Token);
}
}
public void ExecutePendingTasks()
{
VerifyWorkerThread();
while (this.ContinueExecution)
{
this.Token.ThrowIfCancellationRequested();
Task item;
if (_waitForTasks)
{
item = _pendingTasks.Take(this.Token);
}
else
{
if (!_pendingTasks.TryTake(out item))
break;
}
TryExecuteTask(item);
}
}
protected override void QueueTask(Task task)
{
_pendingTasks.Add(task);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _pendingTasks.ToArray();
}
protected override bool TryExecuteTaskInline(
Task task, bool taskWasPreviouslyQueued)
{
return _workerThread == Thread.CurrentThread &&
TryExecuteTask(task);
}
public override int MaximumConcurrencyLevel
{
get { return 1; }
}
public void Dispose()
{
if (_workerTask != null)
{
_workerCts.Cancel();
_workerTask.Wait();
_pendingTasks.Dispose();
_workerTask = null;
}
}
public Task Run(Action action, CancellationToken token)
{
VerifyWorkerTask();
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
}
public Task<T> Run<T>(Func<T> action, CancellationToken token)
{
VerifyWorkerTask();
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
}
public Task Run(Func<Task> action, CancellationToken token)
{
VerifyWorkerTask();
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
{
VerifyWorkerTask();
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
}
}
要实现从工作线程到客户端的通知,您可以使用
IProgress<T>
模式(
此示例)。
SynchronizationContext
呢? - ZoolWaySynchronizationContext
可能可行,但那只是一个低级抽象。我的看法 是使用 TPL 和自定义任务调度程序。 - noseratio - open to work