线程I/O重排序缓冲区的标准术语是什么?

4
我有一个案例,其中许多线程同时生成数据,最终写入一个长的串行流。我需要以某种方式序列化这些写操作,以便流按正确顺序写入。
即,我有一个输入队列包含2048个作业j0..jn,每个作业产生一块数据oi。这些作业在八个线程上并行运行,但输出块必须按照相应输入块的顺序出现在流中——输出文件必须按照o0o1o2...的顺序排列。
解决这个问题很明显:我需要一种缓冲区来累积和按正确顺序写入输出块,类似于Tomasulo算法中的CPU重新排序缓冲区,或者TCP重新组装乱序数据包后将其传递给应用层的方式。
在我开始编写代码之前,我想快速进行文献搜索,看看是否有任何论文可以以特别聪明或高效的方式解决这个问题,因为我有严格的实时和内存限制。然而,我似乎找不到任何描述这种情况的论文;对于每种[线程、并发、重排序缓冲区、重新组装、io、序列化]的排列组合进行学术搜索并没有得到有用的结果。我觉得我可能只是没有使用正确的术语进行搜索。
这种模式是否有常见的学术名称或关键字,我可以搜索一下?
5个回答

1

0

将输出队列包含futures而不是实际数据。当您从输入队列中检索项目时,立即将相应的future发布到输出队列中(请注意确保保留顺序-请参见下文)。当工作线程处理完项目后,可以在future上设置值。输出线程可以从队列中读取每个future,并阻塞直到该future准备就绪。如果后面的future提前准备好,这对输出线程没有影响,只要futures按顺序排列。

有两种方法可以确保输出队列上的futures按正确顺序排列。第一种方法是使用一个互斥量来读取输入队列和写入输出队列。每个线程都会锁定互斥量,从输入队列中取出一个项目,将future发布到输出队列中,然后释放互斥量。

第二种方法是有一个单独的主线程从输入队列中读取,并将future发布到输出队列,然后将项目移交给工作线程执行。

在C ++中,使用单个互斥量保护队列,代码如下:

#include <thread>
#include <mutex>
#include <future>

struct work_data{};
struct result_data{};

std::mutex queue_mutex;
std::queue<work_data> input_queue;
std::queue<std::future<result_data> > output_queue;

result_data process(work_data const&); // do the actual work

void worker_thread()
{
    for(;;) // substitute an appropriate termination condition
    {
        std::promise<result_data> p;
        work_data data;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(input_queue.empty())
            {
                continue;
            }
            data=input_queue.front();
            input_queue.pop();
            std::promise<result_data> item_promise;
            output_queue.push(item_promise.get_future());
            p=std::move(item_promise);
        }
        p.set_value(process(data));
    }
}

void write(result_data const&); // write the result to the output stream

void output_thread()
{
    for(;;) // or whatever termination condition
    {
        std::future<result_data> f;
        {
            std::lock_guard<std::mutex> lk(queue_mutex);
            if(output_queue.empty())
            {
                continue;
            }
            f=std::move(output_queue.front());
            output_queue.pop();
        }
        write(f.get());
    }
}

0

实际上,您不应该需要累积块。大多数操作系统和语言都提供了随机访问文件抽象,允许每个线程独立地将其输出数据写入文件的正确位置,而不会影响任何其他线程的输出数据。

或者,您是在写入真正的串行输出文件,例如套接字吗?


真正的串行——流密码。 - Crashworks
你的解决方案只有在处理完成之前已知输出记录的长度时才有效。 - Nick Johnson

0
我个人不会使用可重新排序的缓冲区。我会为每个作业创建一个“作业”对象,并根据您的环境,使用消息传递或互斥来按顺序接收每个作业的完成数据。如果下一个作业没有完成,您的“写入器”进程将等待直到它完成。

很抱歉,我不明白你的意思。你是说我应该有(n)个互斥量,每个任务一个,而写入者应按升序等待它们吗?问题在于,我只有足够的内存来容纳大约二十个作业,并且如果当前窗口恰好以相反的顺序完成,这将使得几个线程处于空闲状态,直到“头”线程完成。 - Crashworks
这正是我所建议的。如果任务按相反的顺序完成,除非使用Steve的建议(如果您的记录是已知长度)或将已完成的结果缓存到磁盘中,否则不会有任何其他更好的解决方案。 - Nick Johnson

0

我会使用一个环形缓冲区,其长度与您使用的线程数相同。该环形缓冲区还将具有相同数量的互斥锁。

环形缓冲区还必须知道它已写入文件的最后一块的id。这相当于您的环形缓冲区的0索引。

在添加到环形缓冲区时,您需要检查是否可以写入,即索引0已设置,然后可以一次向文件写入多个块。

如果索引0未设置,则仅锁定当前线程以等待。--您还可以使用比您的线程数长2-3倍的环形缓冲区,并在适当时(即:启动足够的作业以填充缓冲区时)才进行锁定。

不要忘记更新最后一块已写入的内容;)

在写入文件时,您还可以使用双缓冲。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接