异步WCF自托管服务

8
我的目标是实现一个异步自托管的WCF服务,它将在单个线程中运行所有请求,并充分利用新的C# 5异步特性。
我的服务器将是一个控制台应用程序,在其中我将设置一个SingleThreadSynchronizationContext,如此处所指定,创建并打开一个ServiceHost,然后运行SynchronizationContext,以便所有WCF请求都在同一线程中处理。
问题在于,尽管服务器能够成功地在同一线程中处理所有请求,但异步操作会阻塞执行并被序列化,而不是交错执行。
我准备了一个简化的示例来重现这个问题。
以下是我的服务合同(服务器和客户端相同):
[ServiceContract]
public interface IMessageService
{
    [OperationContract]
    Task<bool> Post(String message);
}

服务实现如下(稍微简化,但最终实现可能访问数据库或异步调用其他服务):
public class MessageService : IMessageService
{
    public async Task<bool> Post(string message)
    {
        Console.WriteLine(string.Format("[Thread {0} start] {1}", Thread.CurrentThread.ManagedThreadId, message));

        await Task.Delay(5000);

        Console.WriteLine(string.Format("[Thread {0} end] {1}", Thread.CurrentThread.ManagedThreadId, message));

        return true;
    }
}

该服务托管在控制台应用程序中:

static void Main(string[] args)
{
    var syncCtx = new SingleThreadSynchronizationContext();
    SynchronizationContext.SetSynchronizationContext(syncCtx);

    using (ServiceHost serviceHost = new ServiceHost(typeof(MessageService)))
    {
        NetNamedPipeBinding binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);

        serviceHost.AddServiceEndpoint(typeof(IMessageService), binding, address);
        serviceHost.Open();

        syncCtx.Run();

        serviceHost.Close();
    }
}

正如您所看到的,我首先设置了一个单线程的SynchronizationContext。接着,我创建、配置并打开了一个ServiceHost。根据这篇文章,由于我在其创建之前设置了SynchronizationContext,ServiceHost将捕获它,并且所有客户端请求都将被发布在SynchronizationContext中。接下来,我在同一线程中启动了SingleThreadSynchronizationContext

我创建了一个测试客户端,以fire-and-forget方式调用服务器。

static void Main(string[] args)
{
    EndpointAddress ep = new EndpointAddress(address);
    NetNamedPipeBinding binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
    IMessageService channel = ChannelFactory<IMessageService>.CreateChannel(binding, ep);

    using (channel as IDisposable)
    {
        while (true)
        {
            string message = Console.ReadLine();
            channel.Post(message);
        }
    }
}

当我执行这个例子时,我得到了以下结果:
客户端

enter image description here

服务器

enter image description here

客户端发送的消息间隔很短(<1s)。我期望服务器能接收第一个调用并在SingleThreadSynchronizationContext中运行它(排队一个新的WorkItem)。当遇到await关键字时,将再次捕获SynchronizationContext,将继续发布到其上,并且此时该方法将返回一个任务,释放SynchronizationContext以处理第二个请求(至少开始处理它)。
从服务器日志中可以看到,请求已经被正确地发布在SynchronizationContext中。然而,从时间戳可以看出,第一个请求在第二个请求启动之前就已经完成了,这完全违背了异步服务器的目的。
为什么会发生这种情况?
实现WCF自托管异步服务器的正确方式是什么?
我认为问题在于SingleThreadSynchronizationContext,但我不知道如何以其他方式实现它。
我研究了这个主题,但我找不到更多有用的关于异步WCF服务托管的信息,特别是使用基于任务的模式。

添加

这是我对SingleThreadedSinchronizationContext的实现。基本上与article中的实现相同:

public sealed class SingleThreadSynchronizationContext  
        : SynchronizationContext
{
    private readonly BlockingCollection<WorkItem> queue = new BlockingCollection<WorkItem>();

    public override void Post(SendOrPostCallback d, object state)
    {
        this.queue.Add(new WorkItem(d, state));
    }

    public void Complete() { 
        this.queue.CompleteAdding(); 
    }

    public void Run(CancellationToken cancellation = default(CancellationToken))
    {
        WorkItem workItem;

        while (this.queue.TryTake(out workItem, Timeout.Infinite, cancellation))
            workItem.Action(workItem.State);
    }
}

public class WorkItem
{
    public SendOrPostCallback Action { get; set; }
    public object State { get; set; }

    public WorkItem(SendOrPostCallback action, object state)
    {
        this.Action = action;
        this.State = state;
    }
}

为什么使用阻塞队列?我会使用ConcurrentQueue。 - Richard Schneider
这是文章中建议的实现方式,但我同意它,因为SingleThreadSynchronizationContext必须等待下一个工作项被插入。BlockingCollection.TryTake会阻塞直到有可用项,并且还有一种优雅的方法可以使用CancellationToken取消操作。在这种情况下,为什么ConcurrentQueue更可取?完成一个执行后,你将如何等待下一个工作项? - Arthur Nunes
1个回答

9
你需要应用ConcurrencyMode.Multiple
这里的术语有点令人困惑,因为在这种情况下它实际上并不像MSDN文档所述的那样是“多线程”的。它意味着并发。默认情况下(单一并发性),WCF将延迟其他请求,直到原始操作已完成,因此您需要指定多个并发性以允许重叠(并发)请求。你的SynchronizationContext仍然保证只有一个线程来处理所有的请求,因此它并不是真正的多线程。它是单线程并发性。
另外,你可能想考虑使用具有更清晰关闭语义的不同SynchronizationContext。你目前正在使用的SingleThreadSynchronizationContext如果你调用了Complete方法将会"clamp shut";任何在await中的async方法都永远不会被恢复。
我有一个具有更好支持清理关闭的AsyncContext类型。 如果你安装了Nito.AsyncEx NuGet包,你可以使用如下的服务器代码:
static SynchronizationContext syncCtx;
static ServiceHost serviceHost;

static void Main(string[] args)
{
    AsyncContext.Run(() =>
    {
        syncCtx = SynchronizationContext.Current;
        syncCtx.OperationStarted();
        serviceHost = new ServiceHost(typeof(MessageService));
        Console.CancelKeyPress += Console_CancelKeyPress;

        var binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
        serviceHost.AddServiceEndpoint(typeof(IMessageService), binding, address);
        serviceHost.Open();
    });
}

static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
    if (serviceHost != null)
    {
        serviceHost.BeginClose(_ => syncCtx.OperationCompleted(), null);
        serviceHost = null;
    }

    if (e.SpecialKey == ConsoleSpecialKey.ControlC)
        e.Cancel = true;
}

这将把Ctrl-C转换为“软”退出,这意味着只要有客户端连接(或者直到“关闭”超时),应用程序就会继续运行。在关闭期间,现有的客户端连接可以发出新请求,但新的客户端连接将被拒绝。
Ctrl-Break仍然是“硬”退出;在控制台主机中无法更改此设置。

ConcurrencyMode.Multiple是关键。一旦我在服务的行为中进行了配置,应用程序就按预期工作了。我将看一下Nito.AsyncEx,它看起来很有前途。谢谢! - Arthur Nunes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接