在C++ Unix中异步写入文件

13

我有一个需要在每次循环中向文件写入数据的长循环。问题是,写入文件可能很慢,因此我希望通过异步写入来减少这个过程所需的时间。

有人知道好的方法吗?我应该创建一个线程来消耗写入缓冲区的任何内容(在这种情况下,单一生产者,单一消费者)吗?

我主要感兴趣的解决方案都不涉及除C++11标准库之外的任何东西。


2
这不是标准库的一部分,但如果您不喜欢标准库的解决方案,您应该查看libuv - tay10r
@TaylorFlores:谢谢!我会去看一下,但是初步看来,这似乎比我需要的要多得多。 - Andrew Spott
1
你现在使用什么函数进行读写操作?如果你还没有使用stdio库(它提供了缓冲I/O),可以尝试一下。如果你已经在使用,可以尝试调用setvbuf来增加缓冲区大小。 - Eric
2个回答

19

在涉及异步写入之前,如果您正在使用IOStreams,则可能希望尝试避免意外刷新流,例如通过使用std::endl而是使用'\n'。由于向IOStreams写入是有缓冲的,这可以大大提高性能。

如果这还不够,下一个问题是如何编写数据。如果进行了很多格式化操作,实际格式化可能占据了大部分时间。您可能能够将格式化推迟到单独的线程中,但这与仅将几个字节传递到另一个线程有很大区别:您需要传递一个包含要格式化的数据的合适数据结构。具体适合什么可能取决于您实际编写的内容。

最后,如果将缓冲区写入文件真的成为瓶颈,并且您想坚持使用标准C++库,那么可能有一个编写器线程,它侦听来自适当流缓冲区的缓冲区填充的队列,并将这些缓冲区写入std::ofstream中:生产者接口将是发送固定大小的缓冲区,当缓冲区已满或流被刷新时(我会明确使用std::flush)到另一个读取侦听的队列上的std::ostream。以下是仅使用标准库设施对该想法的快速实现:

#include <condition_variable>
#include <fstream>
#include <mutex>
#include <queue>
#include <streambuf>
#include <string>
#include <thread>
#include <vector>

struct async_buf
    : std::streambuf
{
    std::ofstream                 out;
    std::mutex                    mutex;
    std::condition_variable       condition;
    std::queue<std::vector<char>> queue;
    std::vector<char>             buffer;
    bool                          done;
    std::thread                   thread;

    void worker() {
        bool local_done(false);
        std::vector<char> buf;
        while (!local_done) {
            {
                std::unique_lock<std::mutex> guard(this->mutex);
                this->condition.wait(guard,
                                     [this](){ return !this->queue.empty()
                                                   || this->done; });
                if (!this->queue.empty()) {
                    buf.swap(queue.front());
                    queue.pop();
                }
                local_done = this->queue.empty() && this->done;
            }
            if (!buf.empty()) {
                out.write(buf.data(), std::streamsize(buf.size()));
                buf.clear();
            }
        }
        out.flush();
    }

public:
    async_buf(std::string const& name)
        : out(name)
        , buffer(128)
        , done(false)
        , thread(&async_buf::worker, this) {
        this->setp(this->buffer.data(),
                   this->buffer.data() + this->buffer.size() - 1);
    }
    ~async_buf() {
        std::unique_lock<std::mutex>(this->mutex), (this->done = true);
        this->condition.notify_one();
        this->thread.join();
    }
    int overflow(int c) {
        if (c != std::char_traits<char>::eof()) {
            *this->pptr() = std::char_traits<char>::to_char_type(c);
            this->pbump(1);
        }
        return this->sync() != -1
            ? std::char_traits<char>::not_eof(c): std::char_traits<char>::eof();
    }
    int sync() {
        if (this->pbase() != this->pptr()) {
            this->buffer.resize(std::size_t(this->pptr() - this->pbase()));
            {
                std::unique_lock<std::mutex> guard(this->mutex);
                this->queue.push(std::move(this->buffer));
            }
            this->condition.notify_one();
            this->buffer = std::vector<char>(128);
            this->setp(this->buffer.data(),
                       this->buffer.data() + this->buffer.size() - 1);
        }
        return 0;
    }

};

int main()
{
    async_buf    sbuf("async.out");
    std::ostream astream(&sbuf);
    std::ifstream in("async_stream.cpp");
    for (std::string line; std::getline(in, line); ) {
        astream << line << '\n' << std::flush;
    }
}

2
AndrewSpott:使用文件流的默认设置,它是带缓冲区的。您可以通过调用 stream.rdbuf()->setbuf(0, 0) 来禁用文件流的缓冲区。 - Dietmar Kühl
1
@zangw:当缓冲区已满时,它应该自动刷新:当写入一个字符时,如果缓冲区中没有足够的空间,则会调用overflow()[基于流所知道的:还有一个字符空间可以将参数粘贴到overflow()中]。如果您想在缓冲区未满的情况下发送数据,则需要进行flush操作(当流被销毁时,它将进行flush操作)。上面的实现将数据分成128字节的单元。当然,常量可以更改(我还没有对代码进行分析,以确定哪个大小最合适)。 - Dietmar Kühl
2
@zangw:当然可以。上面的类设计会写入缓冲区,并在其满时将其移交给另一个线程,使用新的缓冲区进行写入。可以有一个可用缓冲区队列(如果没有可用缓冲区,则创建一个新的),但这并未实现。您还可以在overflow()中增加缓冲区大小,并仅在显式刷新时发送它(即调用sync()时)。 - Dietmar Kühl
1
@zwang:接下来,您似乎将这个网站与一些免费劳动力的来源混淆了,但实际上它并不是。如果您想让别人审查您的代码,您需要将其放在 codereview 上,可能会在此处发表评论以引起对您的代码审查感兴趣的人们的注意。如果您有关于某些功能如何工作的具体问题,您可以在此处提问。 - Dietmar Kühl
1
@zangw:关于上面提出的具体问题:你应该创建一个问题,而不是在评论中提问。..简短的答案是:你是否可以使用一个缓冲区与流缓冲区关系不大,而是与你如何在两个线程之间同步访问缓冲区有关。如果你确保线程不会触及同一缓冲区中的相同字节,那么事情就没问题了。如果你最终在一个线程中写入一个字节,而在另一个未同步访问的线程中访问,则行为未定义。 - Dietmar Kühl
显示剩余11条评论

3

在网上搜索“双缓冲”。

一般来说,一个线程将写入一个或多个缓冲区。另一个线程从缓冲区中读取,“追踪”写入线程。

这可能不会使您的程序更有效率。对于文件而言,通过以巨大的块写入文件来实现效率,这样驱动器就不会有机会停转。一次写入多个字节比多次写入少量字节更有效率。

这可以通过仅当缓冲区内容超过1k的某个阈值时,写入线程才进行写入来实现。

还要研究“打印流程”或“打印流程池”的主题。

需要使用C++11,因为之前的版本没有标准库中的线程支持。我不知道为什么限制自己,因为Boost中有一些很好的东西。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接