数百个用户和大文件的良好处理方法

9
我有几个数据文件(每个文件接近1GB),其中的数据是字符串行。
我需要使用数百个消费者处理每个文件。每个消费者执行的处理与其他消费者不同。消费者不会同时写入任何地方,他们只需要输入字符串。处理后,他们会更新本地缓冲区。消费者可以轻松并行执行。
重要提示:对于一个特定的文件,每个消费者必须按照正确的顺序(按照它们在文件中出现的顺序)处理所有行,不能跳过。处理不同文件的顺序无关紧要。
单个消费者处理单个行的速度相对较快。我预计在Core i5上少于50微秒。
现在我正在寻找解决这个问题的好方法。这将成为.NET项目的一部分,请让我们仅使用.NET(C#优先)。
我知道TPL和DataFlow。我想最相关的应该是BroadcastBlock。但是我认为问题在于,对于每一行,我都必须等待所有消费者完成才能发布新的行。我想这可能不太有效率。
我认为理想情况应该是这样的:
1. 一个线程从文件中读取并写入缓冲区。 2. 每个消费者准备就绪时,从缓冲区并发读取行并进行处理。 3. 当一个消费者读取输入后,缓冲区中的条目不应被删除。只有当所有消费者都处理完毕后,它才能被删除。 4. TPL自己安排消费者线程。 5. 如果一个消费者的性能优于其他消费者,则它不应等待,并且可以从缓冲区读取更新的条目。
我的想法是否正确?无论是还是否,如何实现好的解决方案?

.NET的哪个版本?而且消费者中总会有一个是最慢的。根据行中的内容,消费者的时间会有所变化吗?消费者需要知道何时是文件的最后一个吗? - paparazzo
任何可以帮助的版本,比如4.5。不行。平均而言,所有消费者处理一行的时间几乎相同。但有时候(很少)会有一个需要更长时间。这取决于行和消费者参数。不,消费者不需要知道。我需要在“高层次”上知道这个。当文件完成后,我调用一些完成程序,然后用新文件重新开始一切。 - shda
1
开始实现并在遇到具体问题时提出问题。建议的方法是否可行取决于您代码的其余部分。如果您想讨论尚未编写的代码,请尝试访问http://programmers.stackexchange.com。 - CodeCaster
1
@Blam SO不适合讨论。我可以指出其他方法的缺陷以及你的方法(你不同意的事情可能并不是一个问题,自身会引入问题),但这不是SO的目的。 - CodeCaster
我会将问题复制给程序员们。 - shda
显示剩余2条评论
2个回答

1
我不同意一个线程从文件中读取并写入缓冲区 对于几个1 GB的文件,该线程将消耗过多的内存。 .NET有一个对象大小限制,集合是一个对象。
你需要控制读取行的速度。 我认为你可以使用BlockingCollection来做到这一点。 bc的1000000用于保持最慢的消费者忙碌,并为打开下一个文件提供一些缓冲。
using System.IO;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace BlockingCollection2
{
    /// <summary>
    /// Interaction logic for MainWindow.xaml
    /// </summary>
    public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
        }
        public static void BC_GetConsumingEnumerableCollection()
        {
            List<string> fileNames = new List<string>();  // add filesNames
            string producerLine;
            System.IO.StreamReader file;
            List<BCtaskBC> bcs = new List<BCtaskBC>();  // add for each consumer
            // Kick off a producer task
            Task.Factory.StartNew(() =>
            {
                foreach(string fileName in fileNames)
                {
                    file = new System.IO.StreamReader(fileName);
                    while ((producerLine = file.ReadLine()) != null)
                    {
                        foreach (BCtaskBC bc in bcs)
                        {
                            // string is reference type but it often acts like a value type
                            // may need to make a deep copy of producerLine for this next line
                            bc.BC.Add(producerLine);  // if  any queue size gets to 1000000 then this blocks
                        }
                    }
                    file.Close();
                }                 
                // Need to do this to keep foreach below from hanging
                foreach (BCtaskBC bc in bcs)
                {
                    bc.BC.CompleteAdding();
                }
            });

            // Now consume the blocking collection with foreach. 
            // Use bc.GetConsumingEnumerable() instead of just bc because the 
            // former will block waiting for completion and the latter will 
            // simply take a snapshot of the current state of the underlying collection. 
            //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
            Parallel.ForEach(bcs, bc =>
            {
                foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
                {
                    bc.BCtask.ProcessTask(consumerLine);  
                }
            } //close lambda expression
                 ); //close method invocation 
            // I think this need to be parallel
            //foreach (BCtaskBC bc in bcs)
            //{
            //    foreach (string consumerLine in bc.BC.GetConsumingEnumerable())
            //    {
            //        bc.BCtask.ProcessTask(consumerLine);
            //    }
            //}
        }
        public abstract class BCtaskBC
        {   // may need to do something to make this thread safe   
            private BlockingCollection<string> bc = new BlockingCollection<string>(1000000);  // this trotttles the size
            public BCtask BCtask { get; set; }
            public BlockingCollection<string> BC { get { return bc; } }
        }
        public abstract class BCtask
        {   // may need to do something to make this thread safe 
            public void ProcessTask(string S) {}
        }
    }
}

一个集合不是单个对象这件事并不完全正确。例如,List<String> 可以容纳超过 2GB 的数据。列表本身只是一堆引用。在 64 位模式下,每个引用占用 8 个字节,因此该集合可以容纳接近 2.5 亿行。每个文本行都是一个独立的对象,最大长度可达 2GB。(假设您没有使用 gcAllowVeryLargeObjects 标志。) - Jim Mischel
@JimMischel,我的经验与你不同。我曾经因为一个大型对象列表而耗尽了内存。当我将对象变小后,我能够在列表中放更多的对象。即使集合可以容纳无限数量的行,我仍然会使用这种方法。 - paparazzo
如果他每次读取时只在内存中保留每个消费者的一行,那么他不应该会耗尽内存。除非这些行非常长。我看不出一次性将所有文件读入内存或将所有消费者数据存储在内存中的原因。 - Justin
@Justin 这个评论是针对我吗? 您读了我的答案吗? 我不会将所有行都读入内存。 我也不同意一次只在内存中保留一行的处理方式。 至少在处理当前行时,我希望能够读取下一行。 那么打开下一个文件需要多长时间呢?您想在等待下一个文件打开时让程序变得不动吗? - paparazzo
我理解得对吗,每个消费者都有一个字符串缓冲区?并且每一行都会添加到所有这些缓冲区中?这样做不会有性能问题吗? - shda
显示剩余2条评论

0

我最近解决了类似的问题。但我的解决方案不是使用C#,而是使用SQL,因为我有很高的耐久性要求。但也许我的一些想法可以帮助你(这是我如何做到的):

我使用了“工作单元”范例。在您的情况下,您可以选择一个工作单元,例如100-1000行文本。在您的情况下,每个工作单元可以由文件名、起始文件位置和结束文件位置来描述。每个工作单元还具有一个标志,指示它是否已被特定消费者处理。我的工作单位存储为DB记录;您可以将它们保存为简单内存结构中的对象,例如列表。

当您的应用程序启动后,会启动一个单独的线程,按顺序读取所有文件并将工作单元添加到列表中。此线程具有要处理的文件列表,它依次读取特定数量的行,记录文件位置,并将文件名和文件位置保存到列表中。

只要列表中有可用的工作单元需要处理,消费者就会从列表开头开始处理这些单元。为了获取特定单元的文本行,消费者使用缓存对象。只要所有消费者都从列表开头开始处理,那么至少在开始时,所有消费者都有很大的机会请求相同的缓存工作单元。
缓存对象与添加工作单元到列表的线程完全独立。该对象的确切实现取决于一些附加要求,例如如果其中一个消费者崩溃或挂起该怎么办,或者应用程序重新启动时该怎么办,或者您是否同意“快速”消费者等待“慢速”消费者,您希望如何监视整个过程等等...
希望这可以帮助...

在我的帖子中,我提供了将工作单元保存在内存中而不是数据库中的建议,并且我还提供了一个包含100-1000行文本的工作单元。这通过减少上下文切换来提高性能。您还可以通过将多个消费者分配给同一线程来选择所需的并行度。 - Slava
是的,您可以将它们保存为对象在简单的内存结构中,比如列表。但是简单的列表无法像数据库那样处理并发,无论是在并发读写方面还是在多线程并发读取方面。 - paparazzo
你可以使用简单的ReaderWriterLockSlim,因为一个线程写入它,而多个消费者线程从中读取。 - Slava
写入和读取需要锁定。这是整个锁定过程。List.Add是O(n),List[i]也是O(n)。因此,如果一个工作单元包含1000个,则可以减少上下文切换。从数据库获取下一个1000是可扩展的,而从列表中获取下一个1000则不可扩展。在数据库中将1000添加到百万级别是可扩展的。将1000添加到列表中的百万级别则不可扩展。我理解工作单元,但我礼貌地建议它不能转换为内存对象。 - paparazzo
我不需要读取接下来的1000行,相反,我需要读取一个工作单元以获取1000行,这可以提高性能。 - Slava
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接