不加锁情况下多线程写入文件

3

我需要从不同的线程中按缓冲区逐个向文件写入数据。为了避免锁定,我将写入到不同的文件中,比如说 'file_1','file_2',最后将它们合并成一个文件 'file'。这种方法是否可行?有更好的建议吗?

一些文件非常巨大,包含数千个缓冲区。因此会创建成千上万个临时文件,然后再合并和清理。


成千上万的临时文件使用成千上万的线程?或者... - Sriram Sakthivel
1
所以,为了避免锁定,现在您有两到三倍的I/O吗? - H H
你为什么想要避免锁定? - Jim Mischel
我正在进行一些分析,以提高性能并避免争用。 - user1884330
2个回答

9

我的直觉是文件的处理会很耗时,而且管理成千上万个文件听起来很复杂容易出错。

那么,我们可以考虑让一个专门的线程进行写入操作。其他线程只需将它们的消息添加到等待写入的队列中即可。虽然需要一些同步开销,但锁中实际执行的工作非常小,只是将"指针"拷贝到消息队列中。打开文件并写入它们可能比获取互斥锁还要耗费更多时间,因此这种方法实际上可能会提高性能。


是的,我也考虑过那种方法。现在会尝试一下。 - user1884330

5
以下是一个示例方法(没有错误处理!),展示如何使用BlockingCollection来管理一个缓冲区队列以写入文件。
您可以创建一个ParallelFileWriter,然后在所有想要写入文件的线程中使用它。完成后,只需处置它(但请确保在所有线程都完成写入之前不要处置它!)。
这只是一个简单的示例,让您开始 - 您需要添加参数检查和错误处理:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class ParallelFileWriter: IDisposable
    {
        // maxQueueSize is the maximum number of buffers you want in the queue at once.
        // If this value is reached, any threads calling Write() will block until there's
        // room in the queue.

        public ParallelFileWriter(string filename, int maxQueueSize)
        {
            _stream     = new FileStream(filename, FileMode.Create);
            _queue      = new BlockingCollection<byte[]>(maxQueueSize);
            _writerTask = Task.Run(() => writerTask());
        }

        public void Write(byte[] data)
        {
            _queue.Add(data);
        }

        public void Dispose()
        {            
            _queue.CompleteAdding();
            _writerTask.Wait();
            _stream.Close();
        }

        private void writerTask()
        {
            foreach (var data in _queue.GetConsumingEnumerable())
            {
                Debug.WriteLine("Queue size = {0}", _queue.Count);
                _stream.Write(data, 0, data.Length);
            }
        }

        private readonly Task _writerTask;
        private readonly BlockingCollection<byte[]> _queue;
        private readonly FileStream _stream;
    }

    class Program
    {
        private void run()
        {
            // For demo purposes, cancel after a couple of seconds.

            using (var fileWriter = new ParallelFileWriter(@"C:\TEST\TEST.DATA", 100))
            using (var cancellationSource = new CancellationTokenSource(2000))
            {
                const int NUM_THREADS = 8;
                Action[] actions = new Action[NUM_THREADS];

                for (int i = 0; i < NUM_THREADS; ++i)
                {
                    int id = i;
                    actions[i] = () => writer(cancellationSource.Token, fileWriter, id);
                }

                Parallel.Invoke(actions);
            }
        }

        private void writer(CancellationToken cancellation, ParallelFileWriter fileWriter, int id)
        {
            int index = 0;

            while (!cancellation.IsCancellationRequested)
            {
                string text = string.Format("{0}:{1}\n", id, index++);
                byte[] data = Encoding.UTF8.GetBytes(text);
                fileWriter.Write(data);
            }
        }

        static void Main(string[] args)
        {
            new Program().run();
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接