C#多线程

4
假设我有一个事件,每秒触发10次。
void Session_OnEvent(object sender, CustomEventArgs e)
{
  //DoStuff

  DoLongOperation(e);
}

我希望每次事件被触发时,方法DoLongOperation(e)都能在一个独立的线程上运行。可以尝试以下方式实现:
new Thread(DoLongOperation).Start(e);

但我有一种感觉这对性能不利,我想要实现最佳性能,那么我应该做什么?

谢谢您的提前帮助。

编辑:当我说长时间时,我并不是指一个操作需要超过1秒钟,只是我不想让事件等待那么长时间,所以我想将其放在单独的线程中执行...


1
你能做的最好的事情:测量。 - Amy B
1
你的意思是要取消之前的LongOperation吗?还是想要对并行的LongOperation设置顶部限制? - alpha-mouse
使用线程可以将现代核心可用的CPU周期数量翻倍,对于性能来说非常有利。但是它也会使程序中未诊断错误的数量增加四倍,这对于交付日期来说是不利的。 - Hans Passant
5个回答

6
直接回答你的问题:使用托管线程池,通过利用ThreadPool.QueueUserWorkItem将操作推送到线程池中。 (您可能需要查看问题“何时使用线程池而不是自己的线程?”的答案)。

然而,请看大局:如果您启动的所有操作都需要超过100毫秒才能完成,则从数学上讲,您将产生更多的工作量而无法处理。无论如何切片,这都不会有好结果。例如,如果每次创建一个单独的线程,那么您的进程将耗尽线程;如果使用线程池,则会淹没它并使其无法完成工作,等等。

如果只有一些操作需要很长时间,而大多数操作立即完成,则您可能有机会找到实际的解决方案。否则,您需要重新考虑程序设计。


我不同意。对于每秒触发多次的事件,使用线程池来管理线程会导致显著的减速,因为线程池管理线程数量和创建新线程的方式。在这种情况下,我认为自己管理线程会更好,但是我会设置工作线程数量的上限,可能使用Interlocked系统来跟踪线程计数。 - KeithS
你怎么知道在没有更多信息的情况下它会导致“明显的减速”?我们甚至不知道什么符合OP的“显著”。此外,盲目限制线程数(或盲目执行任何操作)肯定不会有好结果。正如我所说,很可能OP需要重新考虑他们的设计。 - Jon

2

使用一个线程处理您的请求,并从事件中为该线程排队工作项。

具体来说:

  • 复制 e
  • 创建 List<CustomEventArgs> 并将副本插入末尾
  • 从线程和事件同步访问该列表

作为类的成员对象,请执行以下操作:

List< CustomEventArgs > _argsqueue;
Thread _processor;

在类的构造函数中,执行以下操作:

_argsqueue=new List< CustomEventArgs >();
_processor=new Thread(ProcessorMethod);

定义 processormethod:

void ProcessorMethod()
{
     while (_shouldEnd)
     {
         CustomEventArgs e=null;
         lock (_argsqueue)
         {
             if (_argsqueue.Count>0)
             {
                 CustomEventArgs e=_argsqueue[0];
                 _argsqueue.RemoveAt(0);
             }
         }
         if (e!=null) 
         {
             DoLongOperation(e);
         }
         else
         {
             Sleep(100);
         }
     }
}

在你的事件中:

lock (_argsqueue)
{
    _argsqueue.Add(e.Clone());
}

你需要自己处理细节,例如在表单关闭或处理相关对象时,你需要:

_shouldEnd=true;
_processor.Join();

这里有一个非常简单的例子。 :) - Daniel Mošmondor

2
如果您正在使用C# 4.0,您可能需要考虑使用任务计划程序。由于您的DoLongOperation意味着它将长时间运行,因此您应该考虑以下内容:
长时间运行的任务 您可能希望明确防止将任务放在本地队列中。例如,您可能知道特定的工作项将运行相当长的时间,并且很可能会阻塞本地队列上的所有其他工作项。在这种情况下,您可以指定LongRunning选项,该选项为调度程序提供提示,表明可能需要额外的线程来执行任务,以便它不会阻塞本地队列上的其他线程或工作项的前进进程。通过使用此选项,您完全避免了线程池,包括全局和本地队列。
使用任务计划程序的另一个好处是它具有MaximumConcurrencyLevel。在进行Jon建议的测试后,这使您能够相对轻松地调整并发性。
这是来自MSDN的一个示例,它正好可以做到这一点。
namespace System.Threading.Tasks.Schedulers
{

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;

    class Program
    {
        static void Main()
        {
            LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
            TaskFactory factory = new TaskFactory(lcts);

            factory.StartNew(()=> 
                {
                    for (int i = 0; i < 500; i++)
                    {
                        Console.Write("{0} on thread {1}", i, Thread.CurrentThread.ManagedThreadId);
                    }
                }
            );

            Console.ReadKey();
        }
    }

    /// <summary>
    /// Provides a task scheduler that ensures a maximum concurrency level while
    /// running on top of the ThreadPool.
    /// </summary>
    public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
    {
        /// <summary>Whether the current thread is processing work items.</summary>
        [ThreadStatic]
        private static bool _currentThreadIsProcessingItems;
        /// <summary>The list of tasks to be executed.</summary>
        private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
        /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
        private readonly int _maxDegreeOfParallelism;
        /// <summary>Whether the scheduler is currently processing work items.</summary>
        private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)

        /// <summary>
        /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
        /// specified degree of parallelism.
        /// </summary>
        /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
        public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
        {
            if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
            _maxDegreeOfParallelism = maxDegreeOfParallelism;
        }

        /// <summary>Queues a task to the scheduler.</summary>
        /// <param name="task">The task to be queued.</param>
        protected sealed override void QueueTask(Task task)
        {
            // Add the task to the list of tasks to be processed.  If there aren't enough
            // delegates currently queued or running to process tasks, schedule another.
            lock (_tasks)
            {
                _tasks.AddLast(task);
                if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
                {
                    ++_delegatesQueuedOrRunning;
                    NotifyThreadPoolOfPendingWork();
                }
            }
        }

        /// <summary>
        /// Informs the ThreadPool that there's work to be executed for this scheduler.
        /// </summary>
        private void NotifyThreadPoolOfPendingWork()
        {
            ThreadPool.UnsafeQueueUserWorkItem(_ =>
            {
                // Note that the current thread is now processing work items.
                // This is necessary to enable inlining of tasks into this thread.
                _currentThreadIsProcessingItems = true;
                try
                {
                    // Process all available items in the queue.
                    while (true)
                    {
                        Task item;
                        lock (_tasks)
                        {
                            // When there are no more items to be processed,
                            // note that we're done processing, and get out.
                            if (_tasks.Count == 0)
                            {
                                --_delegatesQueuedOrRunning;
                                break;
                            }

                            // Get the next item from the queue
                            item = _tasks.First.Value;
                            _tasks.RemoveFirst();
                        }

                        // Execute the task we pulled out of the queue
                        base.TryExecuteTask(item);
                    }
                }
                // We're done processing items on the current thread
                finally { _currentThreadIsProcessingItems = false; }
            }, null);
        }

        /// <summary>Attempts to execute the specified task on the current thread.</summary>
        /// <param name="task">The task to be executed.</param>
        /// <param name="taskWasPreviouslyQueued"></param>
        /// <returns>Whether the task could be executed on the current thread.</returns>
        protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // If this thread isn't already processing a task, we don't support inlining
            if (!_currentThreadIsProcessingItems) return false;

            // If the task was previously queued, remove it from the queue
            if (taskWasPreviouslyQueued) TryDequeue(task);

            // Try to run the task.
            return base.TryExecuteTask(task);
        }

        /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
        /// <param name="task">The task to be removed.</param>
        /// <returns>Whether the task could be found and removed.</returns>
        protected sealed override bool TryDequeue(Task task)
        {
            lock (_tasks) return _tasks.Remove(task);
        }

        /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
        public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

        /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
        /// <returns>An enumerable of the tasks currently scheduled.</returns>
        protected sealed override IEnumerable<Task> GetScheduledTasks()
        {
            bool lockTaken = false;
            try
            {
                Monitor.TryEnter(_tasks, ref lockTaken);
                if (lockTaken) return _tasks.ToArray();
                else throw new NotSupportedException();
            }
            finally
            {
                if (lockTaken) Monitor.Exit(_tasks);
            }
        }
    }
}

1

性能将严重依赖于几个因素:

  • 将同时运行多少个线程?
  • 他们将要做什么?
  • 他们将要运行多长时间?(最短执行时间、最长执行时间、平均时间)
  • 如果其中一个线程突然终止会发生什么?

每秒十次的活动率相当高。根据执行的持续时间,使用单独的进程(如服务)可能更有意义。 这个活动显然必须是线程安全的,这意味着(部分地)没有资源争用。 如果两个线程可能需要更新同一资源(文件、内存位置),则需要使用锁定。 如果处理不当,这可能会影响效率。


0

可以这样做。但是,当你的事件每秒触发10次,并且每秒开始10个长时间运行的操作时,你很快就会耗尽线程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接