有一个文件夹里面包含成千上万个小文本文件。我想在更多文件被添加到文件夹时解析和处理它们。我的意图是将此操作多线程化,因为单线程原型需要六分钟才能处理1000个文件。
我希望有读取线程和写入线程,如下所示。当读取线程正在读取文件时,我希望有写入线程来处理它们。一旦读取器开始读取文件,我希望将其标记为正在处理中,例如通过重命名。读取完后,将其重命名为已完成。
如何处理这样的多线程应用程序?
使用分布式哈希表还是队列更好呢?
哪种数据结构可以避免锁定?
这种方案有更好的方法吗?
有一个文件夹里面包含成千上万个小文本文件。我想在更多文件被添加到文件夹时解析和处理它们。我的意图是将此操作多线程化,因为单线程原型需要六分钟才能处理1000个文件。
我希望有读取线程和写入线程,如下所示。当读取线程正在读取文件时,我希望有写入线程来处理它们。一旦读取器开始读取文件,我希望将其标记为正在处理中,例如通过重命名。读取完后,将其重命名为已完成。
如何处理这样的多线程应用程序?
使用分布式哈希表还是队列更好呢?
哪种数据结构可以避免锁定?
这种方案有更好的方法吗?
鉴于大家对.NET 4如何处理这个问题有兴趣,以下是该方法。抱歉,这对提问者可能不是一个选项。免责声明:这不是一项高度科学的分析,只是表明了明显的性能优势。根据硬件配置,您的结果可能会有很大不同。
这是一个快速测试(如果您在这个简单的测试中看到了一个很大的错误,请评论,我们可以改进它使其更加有用/准确)。为此,我只是在一个目录中放了12,000个大小约为60KB的文件作为样本(启动LINQPad; 您可以免费玩耍! - 请确保获取LINQPad 4):
var files =
Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList();
var sw = Stopwatch.StartNew(); //start timer
files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial
sw.Stop(); //stop
sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration
sw.Restart();
files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel
sw.Stop();
sw.ElapsedMilliseconds.Dump("Run MS - Parallel");
List<T>
)不是线程安全的,因此在并行场景中使用它不是一个好主意 :) 幸运的是,在.NET 4中添加了并发集合,这些集合是线程安全的。还要记住,如果您正在使用锁定集合,则根据情况,这可能也是一个瓶颈。
这里使用了.NET 4.0中提供的.AsParallel<T>(IEnumeable<T>)
和.ForAll<T>(ParallelQuery<T>)
扩展方法。.AsParallel()
调用将IEnumerable<T>
包装在一个ParallelEnumerableWrapper<T>
(内部类)中,该类实现了ParallelQuery<T>
。这样就可以使用并行扩展方法,在这种情况下,我们使用.ForAll()
。
.ForAll()
内部创建了一个 ForAllOperator<T>(query, action)
并同步运行它。这处理线程和合并线程的操作,直到它运行结束... 在其中有很多操作,如果你想学习更多内容,包括其他选项,我建议从 这里开始。
计算机规格-供比较:
计算机规格-供比较:
这次我没有CPU/RAM的链接,它们是预装的。这是一台Dell M6400笔记本电脑 (这里有M6500的链接... Dell的自己的6400链接已经失效)。
这些数字是从10次运行中获取的,取内部8个结果的最小值/最大值(尽可能排除每个结果的原始最小值/最大值作为离群值)。我们在这里遇到了I/O瓶颈,特别是在物理驱动器上,但考虑一下串行方法的工作方式。它读取、处理、读取、处理,反复循环。采用并行方法,您可以同时读取和处理(即使存在I/O瓶颈)。在最坏的瓶颈情况下,您正在处理一个文件,同时读取下一个文件。仅仅这样(在任何当前计算机上!)都应该产生一些性能提升。您可以看到我们可以同时处理多个文件,在上面的结果中得到了一些健康的提升。
另一个免责声明:四核+ .NET 4并行不会给您带来四倍的性能,它不会线性扩展...还有其他的考虑因素和瓶颈。
我希望这个回答对于那些对这种方法和潜在好处感兴趣的人有所帮助。欢迎批评或改进...这个回答存在的唯一目的就是为了那些在评论中表明感兴趣的人 :)
这种情况下,生产者/消费者模式可能是最有用的。您应该创建足够的线程来最大化吞吐量。
以下是一些关于生产者/消费者模式的问题,以帮助您了解其工作原理:
您应该使用阻塞队列,生产者应将文件添加到队列中,而消费者则从队列中处理文件。阻塞队列不需要锁定,因此这是解决您问题最高效的方法。
如果您使用的是.NET 4.0,则可以直接使用几个并发集合:
单个生产者线程可能是从磁盘中加载文件并将其推送到队列的最有效方法;随后,多个消费者将从队列中弹出项目并处理它们。我建议您尝试每个核心使用2-4个消费者线程,并进行一些性能测量,以确定哪个最优化(即提供最大吞吐量的线程数)。对于这个特定的示例,我不建议使用线程池。
P.S. 我不明白为什么会担心单点故障和分布式哈希表的使用?我知道DHT听起来很酷,但除非您有正在解决的具体问题,否则我建议先尝试传统方法。
class Program
{
static void Main(string[] args)
{
Supervisor super = new Supervisor();
super.LaunchWaitingThreads();
while (!super.Done) { Thread.Sleep(200); }
Console.WriteLine("\nDone");
Console.ReadKey();
}
}
public delegate void StartCallbackDelegate(int idArg, Worker workerArg);
public delegate void DoneCallbackDelegate(int idArg);
public class Supervisor
{
Queue<Thread> waitingThreads = new Queue<Thread>();
Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>();
int maxThreads = 20;
object locker = new object();
public bool Done {
get {
lock (locker) {
return ((waitingThreads.Count == 0) && (runningThreads.Count == 0));
}
}
}
public Supervisor()
{
// queue up a thread for each file
Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n)));
}
Thread CreateThread(string fileNameArg)
{
Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile);
thread.IsBackground = true;
return thread;
}
// called when a worker starts
public void WorkerStart(int threadIdArg, Worker workerArg)
{
lock (locker)
{
// update with worker instance
runningThreads[threadIdArg] = workerArg;
}
}
// called when a worker finishes
public void WorkerDone(int threadIdArg)
{
lock (locker)
{
runningThreads.Remove(threadIdArg);
}
Console.WriteLine(string.Format(" Thread {0} done", threadIdArg.ToString()));
LaunchWaitingThreads();
}
// launches workers until max is reached
public void LaunchWaitingThreads()
{
lock (locker)
{
while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0))
{
Thread thread = waitingThreads.Dequeue();
runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate
thread.Start();
}
}
}
}
public class Worker
{
string fileName;
StartCallbackDelegate startCallback;
DoneCallbackDelegate doneCallback;
public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg)
{
fileName = fileNameArg;
startCallback = startCallbackArg;
doneCallback = doneCallbackArg;
}
public void ProcessFile()
{
startCallback(Thread.CurrentThread.ManagedThreadId, this);
Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString()));
File.ReadAllBytes(fileName);
doneCallback(Thread.CurrentThread.ManagedThreadId);
}
}
你可以有一个中央队列,读取线程需要在将内存内容推送到队列期间获得写入访问权限。处理线程需要读取此中央队列以弹出下一个要处理的内存流。这样,您可以最小化在锁定中花费的时间,并且不必处理无锁代码的复杂性。
编辑:理想情况下,您应该优雅地处理所有异常/错误条件(如果有),以便您没有失败点。
作为替代方案,您可以拥有多个线程,每个线程在处理之前通过重命名来“声明”文件,因此文件系统成为实现锁定访问的方式。我不知道这是否比我的原始答案更有效,只有测试才能告诉。
一般来说,处理1000个小文件(顺便问一下,有多小?)不应该需要六分钟的时间。作为一个快速测试,在包含这些文件的目录中执行find "foobar" *
命令(引号中的第一个参数无关紧要,可以是任何内容),看看处理每个文件需要多长时间。如果超过了一秒钟,我会感到失望。
假设这个测试证实了我的怀疑,那么这个过程就是CPU密集型的,将读取操作分离到自己的线程中不会有任何改进。你应该:
你可以考虑使用文件队列来处理。在启动时通过扫描目录填充队列,然后使用FileSystemWatcher更新队列以高效地将新文件添加到队列中,而无需不断重新扫描目录。
如果可能的话,请读写不同的物理磁盘。这将为您提供最大的IO性能。
如果您有一大堆要处理的文件,然后不断以不均匀的速度添加新文件,并且所有这些都发生在同一个磁盘(读/写),则可以考虑将已处理的文件缓冲到内存中,直到满足以下两个条件之一:
如果您实际处理文件的过程需要大量CPU时间,则可以考虑每个CPU核心使用一个处理线程。但是,对于“正常”处理,CPU时间与IO时间相比微不足道,复杂性不值得任何小的收益。