线程间通信时间

3
我正在通过端口和接收器连接15个异步操作。这让我非常关注线程间消息传递的时间,尤其是任务将数据发布到端口后,在不同线程上开始处理相同数据之间所需的时间。假设每个线程都在启动时处于空闲状态的最佳情况下,我生成了一个测试,使用计时器类来测量两个不同调度程序之间的时间,每个调度程序都以单个线程的最高优先级运行。
令我惊讶的是,我的开发机是一台运行Windows 7 x64的Q6600四核2.4 Ghz电脑,我的测试平均上下文切换时间为5.66微秒,标准差为5.738微秒,最大值接近1.58毫秒(282倍!)。计时器频率为427.7纳秒,因此我仍然远离传感器噪声。
我想要做的是尽可能地减少线程间消息传递时间,并同样重要的是减少上下文切换的标准差。我意识到Windows不是实时操作系统,没有任何保证,但Windows调度程序是一个公平的轮询优先级调度程序,并且此测试中的两个线程都处于最高优先级(唯一应该如此的线程),因此不应该有任何线程上的上下文切换(通过1.58 ms最长时间可见...我相信Windows quantum是15.65 ms?)我唯一能想到的是操作系统调用CCR使用的锁定机制的时间差异,以在线程之间传递消息。
请告诉我是否有其他人测量过线程间消息传递时间,并且对如何改进它有任何建议。
这是我的测试源代码:
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Microsoft.Ccr.Core;

using System.Diagnostics;

namespace Test.CCR.TestConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Starting Timer");
            var sw = new Stopwatch();
            sw.Start();

            var dispatcher = new Dispatcher(1, ThreadPriority.Highest, true, "My Thread Pool");
            var dispQueue = new DispatcherQueue("Disp Queue", dispatcher);

            var sDispatcher = new Dispatcher(1, ThreadPriority.Highest, true, "Second Dispatcher");
            var sDispQueue = new DispatcherQueue("Second Queue", sDispatcher);

            var legAPort = new Port<EmptyValue>();
            var legBPort = new Port<TimeSpan>();

            var distances = new List<double>();

            long totalTicks = 0;

            while (sw.Elapsed.TotalMilliseconds < 5000) ;

            int runCnt = 100000;
            int offset = 1000;

            Arbiter.Activate(dispQueue, Arbiter.Receive(true, legAPort, i =>
                                                                            {
                                                                                TimeSpan sTime = sw.Elapsed;
                                                                                legBPort.Post(sTime);
                                                                            }));
            Arbiter.Activate(sDispQueue, Arbiter.Receive(true, legBPort, i =>
                                                                             {
                                                                                 TimeSpan eTime = sw.Elapsed;
                                                                                 TimeSpan dt = eTime.Subtract(i);
                                                                                 //if (distances.Count == 0 || Math.Abs(distances[distances.Count - 1] - dt.TotalMilliseconds) / distances[distances.Count - 1] > 0.1)
                                                                                 distances.Add(dt.TotalMilliseconds);

                                                                                 if(distances.Count > offset)
                                                                                 Interlocked.Add(ref totalTicks,
                                                                                                 dt.Ticks);
                                                                                 if(distances.Count < runCnt)
                                                                                     legAPort.Post(EmptyValue.SharedInstance);
                                                                             }));


            //Thread.Sleep(100);
            legAPort.Post(EmptyValue.SharedInstance);

            Thread.Sleep(500);

            while (distances.Count < runCnt)
                Thread.Sleep(25);

            TimeSpan exTime = TimeSpan.FromTicks(totalTicks);
            double exMS = exTime.TotalMilliseconds / (runCnt - offset);

            Console.WriteLine("Exchange Time: {0} Stopwatch Resolution: {1}", exMS, Stopwatch.Frequency);

            using(var stw = new StreamWriter("test.csv"))
            {
                for(int ix=0; ix < distances.Count; ix++)
                {
                    stw.WriteLine("{0},{1}", ix, distances[ix]);
                }
                stw.Flush();
            }

            Console.ReadKey();
        }
    }
}

你为什么说“线程上不应该有任何上下文切换”?我能想到的唯一可能是,如果你能够保证一个线程独占一个核心,那么这种情况才可能发生。从我对 CCR 文档的简要浏览中,我没有发现任何可以实现这一点的功能。 - sipsorcery
你说得对 - 这并不是保证,但是在最高优先级下运行,唯一可能中断线程的是内核中断或另一个以最高优先级运行的进程(这应该不会发生)... - Superman
你不能显式地改变上下文切换的标准差。这取决于太多因素。而且内核也会与其他进程一起使用CPU。 - user224579
4个回答

2

Windows不是一个实时操作系统,但您已经知道了。困扰您的是上下文切换时间,而不一定是消息传递时间。您没有真正指定您的进程间通信是如何工作的。如果您只是运行多个线程,那么不要使用Windows消息作为通信协议,而是尝试使用应用程序托管的消息队列来滚动自己的IPC,您会发现一些收益。

任何版本的Windows在上下文切换发生时,您可以期望的最佳平均值为1毫秒。当您的应用程序必须向内核让步时,您可能会看到1毫秒的时间。这是Ring-1应用程序(用户空间)的设计。如果绝对关键的是要低于1毫秒,则需要将应用程序中的一些部分切换到Ring-0,这意味着编写设备驱动程序。

设备驱动程序不会像用户应用程序那样遭受上下文切换时间,并且还具有纳秒分辨率计时器和休眠调用的访问权限。如果确实需要这样做,则DDK(设备驱动程序开发工具包)可从Microsoft免费获取,但我强烈建议您投资于第三方开发工具包。它们通常具有非常好的示例和许多设置正确的向导,这需要您阅读DDK文档数月才能发现。您还需要获取类似SoftIce的东西,因为普通的Visual Studio调试器无法帮助您调试设备驱动程序。


2
这15个异步操作一定要是异步的吗?也就是说,你是否被某个库的限制所迫,必须以这种方式进行操作,或者你可以选择进行同步调用?
如果你有选择权,你需要构建你的应用程序,以便通过配置参数控制使用异步性。异步操作返回到不同线程与同步操作返回到同一线程之间的区别应该在代码中透明。这样你就可以在不改变代码结构的情况下进行调整。
术语“令人尴尬的并行”描述了一个算法,在这个算法中,大部分工作显然是独立的,因此可以按任意顺序完成,从而容易进行并行化。
但是你正在“通过端口和接收器将15个异步操作链接在一起”。这可以描述为“令人尴尬的串行”。换句话说,同一个程序可以在单个线程上逻辑地编写。但是,你会失去在异步操作之间发生的任何CPU绑定工作的并行性(假设有任何重要的工作)。
如果你编写一个简单的测试来削减任何重要的CPU绑定工作,只测量上下文切换时间,那么你会测量上下文切换时间的变化,就像你已经发现的那样。
运行多个线程的唯一原因是因为你有大量的工作要交给CPU去处理,所以你想在几个CPU之间共享它。如果计算单元的时间足够短暂,那么上下文切换将是任何操作系统的一个重要开销。通过将你的计算分解成15个非常短的阶段,你实际上是在请求操作系统做很多不必要的上下文切换。

0
为了解决这个问题,你是否需要拥有如此多的解耦异步操作呢?也许考虑垂直分割工作(异步处理numCores数据块)而不是水平分割工作(现在每个数据块都要经过15个解耦阶段处理)会更有用;同步耦合一些阶段以将总数减少到较小的数量。
线程间通信的开销始终是不可忽略的。如果你的15个操作中有一些只做了一小部分工作,上下文切换会影响你的性能。

0

ThreadPriority.Highest并不意味着仅线程调度本身具有更高的优先级。Win32 API具有更细粒度的线程优先级(clicky),比Highest高出几个级别(如果我没记错,Highest通常是最高优先级的非管理员代码可以运行的优先级,管理员可以安排更高的优先级,任何硬件驱动程序/内核模式代码也可以)。因此,不能保证它们不会被抢占。

即使一个线程以最高优先级运行,如果它们持有高优先级线程需要的资源锁,Windows也可以提升其他线程的基本优先级。这也是您可能遭受上下文切换的另一个可能性。

即使如此,正如您所说,Windows不是实时操作系统,也不能保证遵守线程优先级。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接