C++低延迟线程异步缓冲流(用于日志记录)- Boost

6

问题:

下面的3个while循环包含已经注释掉的代码。我搜索了("TAG1", "TAG2"和"TAG3")以便更容易地识别它们。我只想让while循环在条件为真时等待,同时尽可能地减少CPU资源的使用。我首先尝试使用Boost条件变量,但存在竞态条件。将线程休眠"x"微秒是低效的,因为没有办法精确计时唤醒。最后,boost::this_thread::yield()似乎没有做任何事情。可能是因为我在双核系统上只有2个活动线程。具体来说,我如何使下面标记的三个区域更有效地运行,同时尽可能地少引入不必要的阻塞。

背景

目标:

我有一个记录大量数据的应用程序。在进行性能分析后,我发现很多时间都花费在记录操作上(将文本或二进制记录到本地硬盘上的文件)。我的目标是通过用调用线程化的缓冲流记录器替换非线程化的直接写入调用来减少logData调用的延迟。

已探索的选项:

  • 升级2005年代慢速硬盘到SSD...可能。成本不是禁止性的...但涉及很多工作...需要升级超过200台计算机...
  • Boost ASIO...我不需要所有的proactor/网络开销,寻找更简单、更轻量级的东西。

设计:

  • 生产者和消费者线程模式,应用程序将数据写入缓冲区,后台线程稍后将其写入磁盘。因此,最终目标是使由应用层调用的writeMessage函数尽可能快地返回,同时数据以FIFO顺序正确/完全记录到日志文件中。
  • 仅一个应用程序线程,仅一个写入线程。
  • 基于环形缓冲区。做出这个决定的原因是使用尽可能少的锁,并且理想情况下...如果我错了,请纠正我...我不认为我需要任何锁。
  • 缓冲区是静态分配的字符数组,但出于性能原因,如果需要/希望,可以将其移动到堆中。
  • 缓冲区具有指向应写入文件的下一个字符的起始指针。缓冲区具有指向应写入文件的最后一个字符后的数组索引的结束指针。结束指针永远不会超过开始指针。如果消息大于缓冲区,则写入器等待缓冲区被清空,并直接将新消息写入文件而不将超大消息放入缓冲区(一旦缓冲区被清空,工作线程将不会写入任何内容,因此没有争用)。
  • 写入器(工作线程)仅更新环形缓冲区的起始指针。
  • 主要的(应用程序线程)仅更新环形缓冲区的结束指针,并且仅在有可用空间时才将新数据插入缓冲区...否则它会等待缓冲区中的空间变得可用或按上述方式直接写入。
  • 工作线程不断检查是否有要写入的数据(表示为缓冲区起始指针!=缓冲区结束指针的情况)。如果没有要写入的数据,则工作线程最好进入睡眠状态,并在应用程序线程插入某些内容(并更改缓冲区的结束指针,使其不再指向与开始指针相同的索引)后唤醒。我下面的内容涉及while循环不断检查该条件。这是一种非常糟糕/低效的等待缓冲区的方式。

结果:

  • 在我的2009年双核笔记本电脑上使用SSD,我发现线程/缓冲基准测试的总写入时间与直接写入大约为1:6(0.609秒与0.095秒),但高度可变。通常,缓冲写入基准测试实际上比直接写入慢。我认为可变性是由于等待缓冲区释放空间的不良实现、等待缓冲区为空以及工作线程等待工作可用性的问题。我已经测量了一些while循环消耗超过10000个周期,并且怀疑这些周期实际上正在竞争其他线程(工作或应用程序)需要完成等待计算所需的硬件资源。
  • 输出似乎检查通过。在启用TEST模式和缓冲区大小为10的压力测试中,我对数百MB的输出进行了差异化比较,并发现它与输入相等。

与当前版本的Boost(1.55)兼容

标题

    #ifndef BufferedLogStream_h
    #define BufferedLogStream_h

    #include <stdio.h>
    #include <iostream>
    #include <iostream>
    #include <cstdlib>
    #include "boost\chrono\chrono.hpp"
    #include "boost\thread\thread.hpp"
    #include "boost\thread\locks.hpp"
    #include "boost\thread\mutex.hpp"
    #include "boost\thread\condition_variable.hpp"
    #include <time.h>

    using namespace std;

    #define BENCHMARK_STR_SIZE 128
    #define NUM_BENCHMARK_WRITES 524288
    #define TEST 0
    #define BENCHMARK 1
    #define WORKER_LOOP_WAIT_MICROSEC 20
    #define MAIN_LOOP_WAIT_MICROSEC 10

    #if(TEST)
    #define BUFFER_SIZE 10 
    #else 
    #define BUFFER_SIZE 33554432 //4 MB
    #endif

    class BufferedLogStream {
        public:
            BufferedLogStream();
            void openFile(char* filename);
            void flush();
            void close();
            inline void writeMessage(const char* message, unsigned int length);
            void writeMessage(string message);
            bool operator() () { return start != end; }

        private:
            void threadedWriter();
            inline bool hasSomethingToWrite();
            inline unsigned int getFreeSpaceInBuffer();
            void appendStringToBuffer(const char* message, unsigned int length);

            FILE* fp;
            char* start;
            char* end;
            char* endofringbuffer;
            char ringbuffer[BUFFER_SIZE];
            bool workerthreadkeepalive;
            boost::mutex mtx;
            boost::condition_variable waitforempty;
            boost::mutex workmtx;
            boost::condition_variable waitforwork;

            #if(TEST)
            struct testbuffer {
                int length;
                char message[BUFFER_SIZE * 2];
            };

            public:
                void test();

            private:
                void getNextRandomTest(testbuffer &tb);
                FILE* datatowrite;
            #endif

        #if(BENCHMARK)
            public:
                void runBenchmark();

            private:
                void initBenchmarkString();
                void runDirectWriteBaseline();
                void runBufferedWriteBenchmark();

                char benchmarkstr[BENCHMARK_STR_SIZE];
        #endif
    };

    #if(TEST)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->openFile("replicated.txt");
        bl->test();
        bl->close();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif

    #if(BENCHMARK)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->runBenchmark();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif //for benchmark

    #endif

实施

    #include "BufferedLogStream.h"

    BufferedLogStream::BufferedLogStream() {
        fp = NULL;
        start = ringbuffer;
        end = ringbuffer;
        endofringbuffer = ringbuffer + BUFFER_SIZE;
        workerthreadkeepalive = true;
    }

    void BufferedLogStream::openFile(char* filename) {
        if(fp) close();
        workerthreadkeepalive = true;
        boost::thread t2(&BufferedLogStream::threadedWriter, this);
        fp = fopen(filename, "w+b");
    }

    void BufferedLogStream::flush() {
        fflush(fp); 
    }

    void BufferedLogStream::close() {
        workerthreadkeepalive = false;
        if(!fp) return;
        while(hasSomethingToWrite()) {
            boost::unique_lock<boost::mutex> u(mtx);
            waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
        }
        flush();        
        fclose(fp);             
        fp = NULL;          
    }

    void BufferedLogStream::threadedWriter() {
        while(true) {
            if(start != end) {
                char* currentend = end;
                if(start < currentend) {
                    fwrite(start, 1, currentend - start, fp);
                }
                else if(start > currentend) {
                    if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp); 
                    fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
                }
                start = currentend;
                waitforempty.notify_one();
            }
            else { //start == end...no work to do
                if(!workerthreadkeepalive) return;
                boost::unique_lock<boost::mutex> u(workmtx);
                waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
            }
        }
    }

    bool BufferedLogStream::hasSomethingToWrite() {
        return start != end;
    }

    void BufferedLogStream::writeMessage(string message) {
        writeMessage(message.c_str(), message.length());
    }

    unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
        if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
        if(end == start) return BUFFER_SIZE-1;
        return start - end - 1; //case where start > end
    }

    void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
        if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
            memcpy(end, message, length);
            end += length;
        }
        else {
            int lengthtoendofbuffer = endofringbuffer - end;
            if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
            int remainderlength =  length - lengthtoendofbuffer;
            memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
            end = ringbuffer + remainderlength;
        }
    }

    void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
        if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
            while(hasSomethingToWrite()); {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            fwrite(message, 1, length, fp);
        }
        else {
            //wait until there is enough free space to insert new string
            while(getFreeSpaceInBuffer() < length) {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            appendStringToBuffer(message, length);
        }
        waitforwork.notify_one();
    }

    #if(TEST)
        void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
            tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
            for(int i = 0; i < tb.length; i++) {
                tb.message[i] = rand() % 26 + 65;
            }
            tb.message[tb.length] = '\n';
            tb.length++;
            tb.message[tb.length] = '\0';
        }

        void BufferedLogStream::test() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            testbuffer tb;
            datatowrite = fopen("orig.txt", "w+b");
            for(unsigned int i = 0; i < 7000000; i++) {
                if(i % 1000000 == 0) cout << i << endl;
                getNextRandomTest(tb);
                writeMessage(tb.message, tb.length);
                fwrite(tb.message, 1, tb.length, datatowrite);
            }       
            fflush(datatowrite);
            fclose(datatowrite);
        }
    #endif

    #if(BENCHMARK) 
        void BufferedLogStream::initBenchmarkString() {
            for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
                benchmarkstr[i] = rand() % 26 + 65;
            }
            benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
        }

        void BufferedLogStream::runDirectWriteBaseline() {
            clock_t starttime = clock();
            fp = fopen("BenchMarkBaseline.txt", "w+b");
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
            }   
            fflush(fp);
            fclose(fp);
            clock_t elapsedtime = clock() - starttime;
            cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBufferedWriteBenchmark() {
            clock_t starttime = clock();
            openFile("BufferedBenchmark.txt");
            cout << "Opend file" << endl;
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
            }   
            cout << "Wrote" << endl;
            close();
            cout << "Close" << endl;
            clock_t elapsedtime = clock() - starttime;
            cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBenchmark() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            initBenchmarkString();
            runDirectWriteBaseline();
            runBufferedWriteBenchmark();
        }
    #endif

更新:2013年11月25日
我更新了下面的代码,使用了boost::condition_variables,具体来说是使用了wait_for()方法,正如Evgeny Panasyuk所建议的那样。这避免了不必要地反复检查相同的条件。目前,我看到缓冲版本的运行时间约为未缓冲 / 直接写入版本的1/6。这不是理想情况,因为两种情况都受硬盘限制(在我的情况下是2010年的SSD)。我计划在硬盘不会成为瓶颈的环境中使用下面的代码,大多数(如果不是全部)时间,缓冲区应该有可用空间来容纳writeMessage请求。这就带来了我的下一个问题。我应该把缓冲区设置多大?我不介意分配32 MB或64 MB以确保它永远不会填满。代码将在可以承受的系统上运行。直觉上,我觉得静态分配32 MB字符数组是个坏主意。是吗?无论如何,我预计当我为我的预期应用程序运行下面的代码时,logData()调用的延迟将大大降低,从而在总处理时间上产生显着的减少。
如果有人发现下面的代码有更好的方法(更快、更强大、更精简等),请告诉我。我感激反馈。Lazin,你的方法如何比我下面发布的方法更快或更有效?我有点喜欢只有一个缓冲区并使其足够大,以便它几乎永远不会填满的想法。然后我就不必担心从不同的缓冲区读取了。Evgeny Panasyuk,我喜欢在可能的情况下使用现有代码,特别是如果它是现有的boost库。然而,我也看不出spcs_queue比我下面的方法更有效。我宁愿处理一个大缓冲区而不是许多较小的缓冲区,并且必须担心在输入上将我的输入流分裂,并在输出上将其拼接回来。您的方法将允许我将格式化从主线程转移到工作线程。这是一种聪明的方法。但我还不确定它是否会节省很多时间,并且要实现全部效益,我必须修改我不拥有的代码。

2
你写道“在进行分析后,我发现许多时间用于日志记录操作”,你需要将时间分解成格式化和写入两个部分。 - Maxim Egorushkin
考虑使用 boost::lockfree::spsc_queue - 这是一个无等待的单生产者单消费者队列。它可以配置为具有编译时容量(内部环形缓冲区的大小)。 - Evgeny Panasyuk
我也刚发现这个链接: http://www.boost.org/doc/libs/1_54_0/doc/html/atomic/usage_examples.html#boost_atomic.usage_examples.example_ringbuffer问题不在于环形缓冲区与spcs_queue的比较。我的问题是如何让工作线程在有工作要做时立即开始工作,并在没有工作时休眠。我认为我可能需要建立一个“导演”来协调。首先,我将使2个缓冲指针成为原子性。我假设由于每个指针只会被一个线程或另一个线程修改,但永远不会同时修改,因此操作将是原子性的。 - 486DX2-66
@Yegorushkin:我想尽可能减少格式化和写入时间,所以我计划将两者都减少。上面的基准测试显示,仅由于写入部分(上面使用的fwrite不进行格式化,只直接将字符写入文件),延迟至少减少了6倍(一旦我找到修复while循环的方法,我认为这个比例将超过一个数量级)。此外,在应用程序级别进行的格式化非常基本...不执行对象序列化或任何复杂操作。主要是Int2Char和Double2Char。 - 486DX2-66
还发现这个链接很有帮助:http://www.csd.uwo.ca/~moreno/HPC-Slides/Synchronizing_without_Locks.pdf - 486DX2-66
显示剩余2条评论
2个回答

6

通用解决方案。

我认为您必须看一下Nagle算法。对于一个生产者和一个消费者,它应该是这样的:

  • 开始时缓冲区为空,工作线程处于空闲状态并等待事件。
  • 生产者将数据写入缓冲区并通知工作线程。
  • 工作线程被唤醒并开始写操作。
  • 生产者尝试写另一条消息,但缓冲区正在被工作线程使用,因此生产者分配另一个缓冲区并将消息写入其中。
  • 生产者尝试写另一条消息,I/O仍在进行中,因此生产者将消息写入先前分配的缓冲区。
  • 工作线程完成将缓冲区写入文件,并发现有另一个带有数据的缓冲区,因此它抓住它并开始写入。
  • 第一个缓冲区由生产者用于写入所有连续的消息,直到第二个写操作正在进行。

这个模式将有助于实现低延迟要求,单个消息将立即写入磁盘,但大量事件将通过大批量写入以获得更高的吞吐量。

如果您的日志消息具有级别-您可以稍微改进这个模式。所有错误消息都具有高优先级(级别)并且必须立即保存到磁盘上(因为它们很少但非常有价值),但是调试和跟踪消息具有低优先级,并且可以缓冲以节省带宽(因为它们非常频繁,但不像错误和信息消息那样有价值)。因此,当您写入error消息时,必须等待工作线程完成写入您的消息(以及在同一缓冲区中的所有消息),然后继续,但是调试和跟踪消息可以直接写入缓冲区。

线程。

为每个应用程序线程生成工作线程的成本太高了。您必须为每个日志文件使用单个写入器线程。写入缓冲区必须在线程之间共享。每个缓冲区必须有两个指针-commit_pointerprepare_pointer。从缓冲区开头到commit_pointer之间的所有缓冲区空间都可供工作线程使用。commit_pointerprepare_pointer之间的缓冲区空间由应用程序线程当前更新。不变式:commit_pointer<=prepare_pointer

写操作可以分为两个步骤。

  1. 准备写入。此操作在缓冲区中保留空间。
    • 生产者计算消息的长度并原子性地更新prepare_pointer
    • 旧的prepare_pointer值和长度由消费者保存;
  2. 提交写入。
    • 生产者在保留的缓冲区空间(旧的prepare_pointer值)开头写入消息。
    • 生产者忙等待,直到commit_pointer等于本地变量保存的旧prepare_pointer值。
    • 通过原子方式执行commit_pointer = commit_pointer + len来提交写入操作。

为了防止虚假共享,可以将len(message)舍入到缓存行大小,并用空格填充所有额外的空间。

// pseudocode
void write(const char* message) {
    int len = strlen(message);  // TODO: round to cache line size
    const char* old_prepare_ptr;
    // Prepare step
    while(1) 
    {
        old_prepare_ptr = prepare_ptr;
        if (
            CAS(&prepare_ptr, 
                 old_prepare_ptr, 
                 prepare_ptr + len) == old_prepare_ptr
            )
            break;
        // retry if another thread perform prepare op.
    }
    // Write message
    memcpy((void*)old_prepare_ptr, (void*)message, len);
    // Commit step
    while(1)
    {
        const char* old_commit_ptr = commit_ptr;
        if (
             CAS(&commit_ptr, 
                  old_commit_ptr, 
                  old_commit_ptr + len) == old_commit_ptr
            )
            break;
        // retry if another thread commits
    }
    notify_worker_thread();
}

Lazin,感谢您的建议。上述方法看起来很可靠。唯一的缺点是需要管理两个缓冲区而不是一个,并且在两个缓冲区都已满且工作线程仍在写入时,应用程序线程仍将等待。否则,会分配新的缓冲区,我希望在构建缓冲流之后避免内存分配。我认为您在CAS部分是正确的...也许我不应该在上面的设计中省略它。 - 486DX2-66
您不仅限于两个缓冲区,还可以创建有界缓冲区队列并预先分配它们(当然 - 可以重复使用它们)。如果您的文件写入线程无法写入所有数据,则需要稍微减慢生产者线程的速度。这实际上是一种需要防止内存错误的反压力。 - Evgeny Lazin

3

concurrent_queue<T, Size>

我的问题是如何使工作线程在有工作要做时立即开始工作,并在没有工作可做时休眠。

这里有一个 boost::lockfree::spsc_queue - 无等待单生产者单消费者队列。它可以配置为具有编译时容量(内部环形缓冲区的大小)。

据我所知,您想要类似于以下配置:

template<typename T, size_t N>
class concurrent_queue
{
    // T can be wrapped into struct with padding in order to avoid false sharing
    mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
    mutable mutex m;
    mutable condition_variable c;

    void wait() const
    {
        unique_lock<mutex> u(m);
        c.wait_for(u, chrono::microseconds(1)); // Or whatever period you need.
        // Timeout is required, because modification happens not under mutex
        //     and notification can be lost.
        // Another option is just to use sleep/yield, without notifications.
    }
    void notify() const
    {
        c.notify_one();
    }
public:
    void push(const T &t)
    {
        while(!q.push(t))
            wait();
        notify();
    }
    void pop(T &result)
    {
        while(!q.pop(result))
            wait();
        notify();
    }
};

当队列中有元素时,pop 操作不会阻塞。当内部缓冲区有足够的空间时,push 操作也不会阻塞。

concurrent<T>

我希望尽可能地减少格式和写作时间,所以我计划同时减少这两个方面。

看看 Herb SutterC++ and Beyond 2012 上的演讲:C++ 并发性。在第 14 页,他展示了 concurrent<T> 的示例。基本上,它是对类型为 T 的对象进行包装,在该对象上执行所有操作的单独线程。使用方法如下:

concurrent<ostream*> x(&cout); // starts thread internally
// ...
// x acts as function object.
// It's function call operator accepts action
//   which is performed on wrapped object in separate thread.
int i = 42;
x([i](ostream *out){ *out << "i=" << i; }); // passing lambda as action

您可以使用类似的模式来将所有格式化工作都转移到消费者线程上进行处理。

小对象优化

否则,在构造缓冲区流之后,会分配新的缓冲区,我希望避免内存分配。

上述concurrent_queue<T, Size>示例使用固定大小的缓冲区,它完全包含在队列中,并且不会产生额外的分配。

然而,Herb的concurrent<T>示例使用std::function将操作传递到工作线程中。这可能会导致昂贵的分配。

std::function实现可以使用小对象优化(大多数实现都是如此)-小函数对象在内部缓冲区中进行原地复制构造,但并不保证对于大于阈值的函数对象-会发生堆分配。

有几种选项可避免此分配:

  1. 实现具有足够大的内部缓冲区以容纳目标函数对象的std::function模拟(例如,您可以尝试修改boost::function版本)。

  2. 使用自己的函数对象,它将表示所有类型的日志消息。基本上,它只包含格式化消息所需的值。由于可能有不同类型的消息,请考虑使用boost::variant(这是与类型标记配对的文字联合)来表示它们。

将所有内容放在一起,这是概念证明(使用第二个选项):

LIVE DEMO

#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/variant.hpp>

#include <condition_variable>
#include <iostream>
#include <cstddef>
#include <thread>
#include <chrono>
#include <mutex>

using namespace std;

/*********************************************/
template<typename T, size_t N>
class concurrent_queue
{
    mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
    mutable mutex m;
    mutable condition_variable c;

    void wait() const
    {
        unique_lock<mutex> u(m);
        c.wait_for(u, chrono::microseconds(1));
    }
    void notify() const
    {
        c.notify_one();
    }
public:
    void push(const T &t)
    {
        while(!q.push(t))
            wait();
        notify();
    }
    void pop(T &result)
    {
        while(!q.pop(result))
            wait();
        notify();
    }
};

/*********************************************/
template<typename T, typename F>
class concurrent
{
    typedef boost::optional<F> Job;

    mutable concurrent_queue<Job, 16> q; // use custom size
    mutable T x;
    thread worker;

public:
    concurrent(T x)
        : x{x}, worker{[this]
        {
            Job j;
            while(true)
            {
                q.pop(j);
                if(!j) break;
                (*j)(this->x); // you may need to handle exceptions in some way
            }
        }}
    {}
    void operator()(const F &f)
    {
        q.push(Job{f});
    }
    ~concurrent()
    {
        q.push(Job{});
        worker.join();
    }
};

/*********************************************/
struct LogEntry
{
    struct Formatter
    {
        typedef void result_type;
        ostream *out;

        void operator()(double x) const
        {
            *out << "floating point: " << x << endl;
        }
        void operator()(int x) const
        {
            *out << "integer: " << x << endl;
        }
    };
    boost::variant<int, double> data;

    void operator()(ostream *out)
    {
        boost::apply_visitor(Formatter{out}, data);
    }
};

/*********************************************/
int main()
{
    concurrent<ostream*, LogEntry> log{&cout};

    for(int i=0; i!=1024; ++i)
    {
        log({i});
        log({i/10.});
    }
}

c) 关于等待应用线程在满队列上的情况:无论如何都应该处理这种情况。使用无界队列只会掩盖问题——队列仍然不能无限增长。如果生产者制造的值比消费者处理它们的速度快得多,最终将耗尽全部内存。有不同的选择,例如:减缓生产者(即像我们的示例中一样阻止它),删除旧的未处理元素,删除当前元素。d) 关于唤醒工作线程:请注意,当有元素可用时,可以接收通知并唤醒。 - Evgeny Panasyuk
关于格式化 - 这是对OP的问题,我不知道他做了什么样的格式化 - 他只是想卸载所有操作。 "这将导致过多的数据复制。" - 好吧,这取决于情况。如果我们只发送16个字节的值,应该将其格式化为100个字符的字符串 - 那么在应用程序线程中分配或复制这样的字符串会更慢。关于ostream - 显然这只是为了自包含和工作演示而使用(是的,iostreams很慢),当然OP不会登录到std :: cout,并且可以轻松使用其他类型的接收器来使用所描述的方法。 - Evgeny Panasyuk
如果他使用C风格的格式化,那么格式化字符串必须与参数一起复制。 - Evgeny Lazin
只有在某些情况下才能使用。格式字符串可以与适当类型的消息相关联(请参见我的示例中的LogEntry代码,我为不同类型使用了不同的格式)。或者,如果使用sso_function,则为[value](Out &o) { o << "format string " << value; } - Evgeny Panasyuk
目前,我们将字符串串联在一起,即使有数千个字符,因为这样做比写入数百次更便宜。然而,如果新的日志流几乎立即返回(当缓冲区未满时的memcpy速度),那么我可以更频繁地向流写入,避免串联流和字符串以及伴随其执行的内存分配。然后,我还避免了调用流或C++字符串上的.str()方法和分配新字符数组的步骤...所以我认为这将节省大部分时间。 - 486DX2-66
显示剩余9条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接