使用BlockingCollection<T>()扩展连接

6
我有一个服务器,通过TCP局域网与50个或更多设备通信。每个套接字都有一个任务运行来读取消息循环。我将每个消息缓冲到一个阻塞队列中,每个阻塞队列都使用BlockingCollection.Take()方法运行一个任务。因此,大致的代码如下: 套接字读取任务
Task.Run(() =>
{
    while (notCancelled)
    {
        element = ReadXml();
        switch (element)
        {
            case messageheader:
                MessageBlockingQueue.Add(deserialze<messageType>());
            ...
        }
    }
});

消息缓冲任务
Task.Run(() =>
{
    while (notCancelled)
    {
        Process(MessageQueue.Take());
    }
});

这么做会产生 50 多个读取任务和 50 多个任务阻塞在它们自己的缓冲区上。
我这样做是为了避免阻塞读取循环,使程序能够更公平地分配消息处理时间,或者我认为是这样的。
这种处理方式是否低效?有什么更好的方法吗?

你对这些消息做什么? - TheGeneral
@TheGeneral 数据库存储,记录并将数据发送给客户端,但某些数据确实需要额外的处理。 - FinalFortune
3个回答

8
您可能对 "channels" 工作感兴趣,特别是:System.Threading.Channels。其目的是提供异步的生产者/消费者队列,涵盖单个和多个生产者和消费者方案、上限等。通过使用异步 API,您不必占用许多线程等待某些操作完成。
您的读取循环将变为:
while (notCancelled) {
    var next = await queue.Reader.ReadAsync(optionalCancellationToken);
    Process(next);
}

以及制造商:

switch (element)
{
    case messageheader:
        queue.Writer.TryWrite(deserialze<messageType>());
        ...
}

所以: 最小的更改


或者 - 或者结合使用 - 您可以研究“管道”等内容 (https://www.nuget.org/packages/System.IO.Pipelines/) - 由于您正在处理TCP数据,这将是一个理想的选择,并且是我在 Stack Overflow 上自定义Websocket服务器时研究的东西(它处理了大量的连接)。由于API始终是异步的,因此它能够很好地平衡工作负载 - 而管道API是专门针对典型的TCP场景而设计的,例如在检测到帧边界时部分消耗传入的数据流。我写了很多关于此用法的文章,主要是 这里。请注意,“管道”不包括直接的TCP层,但“kestrel”服务器包括其中之一,或第三方库https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/中包含(声明:我编写了它)。


1
有趣,我会看看它们。我曾经看过TPL Dataflow,但似乎这是更好的选择。 - FinalFortune

2

我在另一个项目中实际上也做了类似的事情。以下是我所学到的或者我会做得不同的地方:

  1. First of all, better to use dedicated threads for the reading/writing loop (with new Thread(ParameterizedThreadStart)) because Task.Run uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.

    var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
    thread.Start(cancellationToken);
    
  2. Your Process can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:

    private void ReaderLoop(object state)
    {
        var token = (CancellationToken)state;
        while (!token.IsCancellationRequested)
        {
            try
            {
                var message = MessageQueue.Take(token);
                OnMessageReceived(new MessageReceivedEventArgs(message));
            }
            catch (OperationCanceledException)
            {
                if (!disposed && IsRunning)
                    Stop();
                break;
            }
        }
    }
    
请注意,如果委托有多个目标,它的异步调用并不是简单的。我创建了这个扩展方法,在池线程上调用委托:
public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
    void Callback(IAsyncResult ar)
    {
        var method = (EventHandler<TEventArgs>)ar.AsyncState;
        try
        {
            method.EndInvoke(ar);
        }
        catch (Exception e)
        {
            HandleError(e, method);
        }
    }

    foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
        handler.BeginInvoke(sender, args, Callback, handler);
}

因此,OnMessageReceived的实现可以是:

protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
    => messageReceivedHandler.InvokeAsync(this, e);
  1. Finally it was a big lesson that BlockingCollection<T> has some performance issues. It uses SpinWait internally, whose SpinOnce method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fast BlockingCollection implementation using an AutoResetEvent for triggering incoming data. I added a Take(CancellationToken) overload to it as follows:

    /// <summary>
    /// Takes an item from the <see cref="FastBlockingCollection{T}"/>
    /// </summary>
    public T Take(CancellationToken token)
    {
        T item;
        while (!queue.TryDequeue(out item))
        {
            waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
            token.ThrowIfCancellationRequested();
        }
    
        return item;
    }
    
基本上就是这样。也许并不是所有内容都适用于您的情况,例如,如果几乎立即响应不是关键问题,则常规的BlockingCollection也可以解决问题。

1

是的,这有点低效,因为它会阻塞线程池线程。 我已经讨论了这个问题使用Task.Yield解决实现生产者/消费者模式时的线程池饥饿问题

您还可以查看测试生产者-消费者模式的示例: https://github.com/BBGONE/TestThreadAffinity

您可以在循环中使用await Task.Yield,以使其他任务访问此线程。

您还可以通过使用专用线程或更好地使用自定义线程调度程序来解决它,该调度程序使用其自己的线程池。但是创建50多个普通线程是低效的。最好调整任务,使其更具合作性。

如果您使用BlockingCollection(因为它可以阻塞线程长时间等待写入(如果有界)或读取或没有要读取的项),那么最好使用System.Threading.Tasks.Channelshttps://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md
当集合可用于写入或读取时,它们不会阻塞线程。下面是一个示例,说明如何使用https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接