C++多线程解码音频数据

3
我需要尽可能快地使用Opus解码器解码音频数据。
目前我的应用程序不够快。虽然解码速度已经很快了,但我需要更快的速度。
我需要解码大约100个音频部分。这些部分不是连续的(它们彼此之间没有关联)。
我考虑使用多线程,这样我就不必等待100次解码中的任何一次完成。在我的梦里,我可以同时启动所有的解码。我以前没有使用过多线程技术。
因此,我想问一下,我的做法通常是否正确或者是否存在思路上的错误。
谢谢。

@MarkGarcia,你能告诉我该怎么做吗?解码器必须是单独的exe文件吗?还是我可以在我的主应用程序中实现它? - tmighty
你可以查看 基于任务的并行性 或者 parallel::for_each,也可以使用 tbb 来实现相同的功能。 - Dmitry Ledentsov
2
这100个音频部分是在磁盘上吗?有多少时间只是用来读/写磁盘?由于磁盘无法并行读取,因此多线程对于与磁盘相关的工作部分没有帮助。 - ScottMcP-MVP
3
你的问题现在过于模糊。我认为最好的做法是更详细地阐述数据格式和架构,以及你梦想中究竟希望看到的是什么。如果不是因为丰厚的奖金,这个问题可能已经因为不够具体而被标记为关闭。有趣的是这样行得通的 :P - Preet Kukreti
在考虑多线程之前,请确保您正在使用非阻塞、多路复用的边缘触发轮询。 - Kerrek SB
显示剩余4条评论
6个回答

3
这个答案可能需要社区的进一步完善,因为我已经很久没有在这个环境中工作了,但是以下是一个开始的例子 -
由于您是C++多线程编程的新手,建议从一个简单的项目开始,创建一堆pthread来执行简单的任务。
这里有一个创建pthread的快速小例子:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void* ThreadStart(void* arg);

int main( int count, char** argv) {
        pthread_t thread1, thread2;

        int* threadArg1 = (int*)malloc(sizeof(int));
        int* threadArg2 = (int*)malloc(sizeof(int));

        *threadArg1 = 1;
        *threadArg2 = 2;

        pthread_create(&thread1, NULL, &ThreadStart, (void*)threadArg1 );
        pthread_create(&thread2, NULL, &ThreadStart, (void*)threadArg2 );

        pthread_join(thread1, NULL);
        pthread_join(thread2, NULL);
        free(threadArg1);
        free(threadArg2);
}

void* ThreadStart(void* arg) {

        int threadNum = *((int*)arg);
        printf("hello world from thread %d\n", threadNum);

        return NULL;
}

接下来,您将使用多个Opus解码器。Opus似乎是线程安全的,只要为每个线程创建单独的OpusDecoder对象即可。
为了向线程提供工作任务,您需要一个可以以线程安全方式访问的待处理工作单元列表。您可以使用std::vector或std::queue,但在添加和删除时需要使用锁,并且您需要使用计数信号量,以便线程会阻塞但保持活动状态,而您则逐渐向队列中添加工作单位(例如从磁盘读取的文件缓冲区)。
以下是一些类似上面的示例代码,演示了如何使用共享队列,以及如何使线程等待,直到您填充队列:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <queue>
#include <semaphore.h>
#include <unistd.h>

void* ThreadStart(void* arg);

static std::queue<int> workunits;
static pthread_mutex_t workunitLock;
static sem_t workunitCount;

int main( int count, char** argv) {
    pthread_t thread1, thread2;

    pthread_mutex_init(&workunitLock, NULL);
    sem_init(&workunitCount, 0, 0);

    pthread_create(&thread1, NULL, &ThreadStart, NULL);
    pthread_create(&thread2, NULL, &ThreadStart, NULL);

    // Make a bunch of workunits while the threads are running.
    for (int i = 0; i < 200; i++ ){
        pthread_mutex_lock(&workunitLock);

        workunits.push(i);
        sem_post(&workunitCount);

        pthread_mutex_unlock(&workunitLock);

        // Pretend that it takes some effort to create work units;
        // this shows that the threads really do block patiently
        // while we generate workunits.
        usleep(5000);
    }

    // Sometime in the next while, the threads will be blocked on
    // sem_wait because they're waiting for more workunits. None
    // of them are quitting because they never saw an empty queue.
    // Pump the semaphore once for each thread so they can wake
    // up, see the empty queue, and return.

    sem_post(&workunitCount);
    sem_post(&workunitCount);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    pthread_mutex_destroy(&workunitLock);
    sem_destroy(&workunitCount);

}

void* ThreadStart(void* arg) {

    int workUnit;
    bool haveUnit;

    do{
        sem_wait(&workunitCount);

        pthread_mutex_lock(&workunitLock);

        // Figure out if there's a unit, grab it under
        // the lock, then release the lock as soon as we can.
        // After we release the lock, then we can 'process'
        // the unit without blocking everybody else.
        haveUnit = !workunits.empty();

        if ( haveUnit ) {
            workUnit = workunits.front();
            workunits.pop();
        }
        pthread_mutex_unlock(&workunitLock);

        // Now that we're not under the lock, we can spend
        // as much time as we want processing the workunit.
        if ( haveUnit ) {
            printf("Got workunit %d\n", workUnit);
        }
    } 
    while(haveUnit);

    return NULL;
}

对于所有的编辑,我很抱歉 :) - antiduh

3
你需要按任务分解你的工作。假设你的进程实际上是 CPU 绑定的(你表示它是,但通常情况并不那么简单)。
目前,你解码了 100 个部分:
“我在考虑使用多线程,这样我就不必等待其中一个 100 个解码完成。在我的梦想中,我可以同时启动所有东西。”
实际上,你应该使用接近计算机核心数的数字。
假设现代台式机(例如 2-8 核心),一次运行 100 个线程只会减慢速度;内核将浪费很多时间在不同的线程之间切换,进程也可能使用更高的峰值资源并争夺相似的资源。
因此,只需创建一个任务池,将活动任务的数量限制为核心数。每个任务通常代表要执行的一个输入文件(部分)的解码工作。这样,解码过程实际上并没有跨多个线程共享数据(使您避免锁定和其他资源争用)。
完成后,返回并微调任务池中的进程数(例如使用完全相同的输入和多台机器上的秒表)。最快的可能比核心数低或高(最有可能是因为磁盘 I/O)。进行分析也有所帮助。
“因此,我想问一下我的方法是否一般正常,或者是否存在思维错误。”
是的,如果问题是 CPU 绑定的,那么通常是可以的。这还假设你的解码器/依赖软件能够使用多个线程运行。
你会发现,如果这些是磁盘上的文件,你将需要优化如何从许多核心读取(和写入)文件。因此,允许它同时运行 8 个作业可能会使你的问题变成磁盘绑定 - 同时进行 8 个读取器/写入器是使用硬盘的不良方式,因此你可能会发现它并不像你期望的那样快。因此,您可能需要为并发解码实现优化 I/O。在这方面,使用较大的缓冲区大小,但这会带来内存成本。

1
好的答案,尽管通常你应该拥有的线程数是(机器上核心数量)* 2。 - Adrian
1
线程数 = n * 2?通常我使用以下规则:当 n <= 2 时,线程数 = n;当 2 < n <= 10 时,线程数 = n+1;当 n > 10 时,线程数 = n * 1.2 - antiduh
1
请参考以下链接:https://dev59.com/U2Yr5IYBdhLWcg3wQH--#13958877(最好在您的CPU和任务上进行一些测试;有些处理器每个核心可以处理8个线程,有些可以处理40个线程) - Adrian
1
@justin - 我同意你的观点,有很多变量需要考虑。我指的是对于纯CPU密集型代码而言。对于这种特定情况,我更倾向于建议使用一个管道,其中1-2个线程读取文件,N * fudge个线程处理缓冲区,1-2个线程写出文件。这样可以减少总线程数,并且通常与磁盘更加兼容 - 10个线程同时尝试写入10个文件可能会使硬盘到处寻找,浪费时间,而不是一次写入一两个文件。 - antiduh
1
@antiduh 是的,我从 CPU 受限的假设下开始使用 NCORES,并决定放弃它,因为在达到 NCORES*2 之前,I/O 经常成为一个问题。让硬盘同时处理多个位置可能是 NCORES/2 完成更快的一个例子,而不是 NCORES。由于不知道 OP 的编码或这些变量的更多信息,我无法做出好的建议 - 所以我只建议他/她从 NCORES 开始测量。 - justin
显示剩余2条评论

2

建议您使用线程池将解码任务交给线程池,不要自己创建和管理线程。线程池会根据系统的处理能力分配任务给尽可能多的线程。不同类型的线程池可以设置一些参数,比如强制使用特定数量的线程,或者是否允许线程池增加线程。

需要注意的是,线程数增多并不意味着它们会并行执行。正确的术语应该是同时执行,除非您保证每个线程在不同的CPU上运行(这才能实现真正的并行)

如果受阻于I/O操作,整个线程池也可能停止运行。


0

虽然你的问题有些含糊不清,但是怎么样考虑这个:

Create a list of audio files to convert.

While there is a free processor, 
   launch the decoder application with the next file in the queue.   
Repeat until there is nothing else in the list

如果在测试过程中发现处理器并不总是100%繁忙,可以每个处理器启动2个解码器。这可以很容易地通过一些bash/tcl/python代码完成。

0

一般情况下,您可以使用线程,但锁定会有一些问题。我将以POSIX线程和锁为基础回答,但这是相当通用的,您可以将其移植到任何平台上。但如果您的工作需要任何类型的锁定,则可能会发现以下内容有用。此外,最好一直使用现有的线程,因为线程创建成本很高(请参见线程池)。

对于“实时”音频来说,锁定通常不是一个好主意,因为它会增加延迟,但对于解码/编码的实时作业来说,它们完全可以,甚至对于实时作业,您可以通过使用一些线程知识获得更好的性能并且不会丢帧。

对于音频而言,信号量是一个糟糕的选择。当我尝试使用它们时(POSIX信号量),它们在我的系统上太慢了,但如果您考虑跨线程锁定(不是在一个线程中锁定并在同一线程中解锁的类型),则需要它们。 POSIX互斥锁仅允许自锁定和自解锁(您必须在同一线程中执行两者),否则程序可能会工作,但这是未定义的行为,应该避免。

大多数无锁原子操作可能会为您提供足够的自由,以使用某些功能(如锁定),但具有更好的性能。


0
在着手使用多线程来加速程序之前,先学习一下过度订阅和欠订阅的概念。
如果音频处理涉及到长时间的IO阻塞调用,则使用多线程是值得的。

你的观点似乎很有道理,但能不能说得更明白一些呢? - tmighty
我不是故意说话晦涩难懂的,如果需要详细说明某个特定点,请告诉我。 - Anand Rathi
你的意思是,如果速度缺失是由于磁盘访问(=IO)造成的,多线程不起作用(因为磁盘不能同时访问多次),但如果我确实在谈论CPU执行的解码,那么这可能值得一试? - tmighty
@tmighty,你在评论的第一部分是正确的。我不太理解你的第二部分。是的,只有CPU才会进行解码。你可能想说的是如果数据在内存中而不是磁盘中。 - Adrian
1
@Adrian 他的意思是,如果你访问磁盘,这需要很长时间,你的CPU就没有事可做了。与此同时,另一个线程可以解码音频。 - AlexWien
感谢 @AlexWien。确实,如果磁盘访问需要时间,则可以为另一个线程提供 CPU 周期,否则您的 CPU 将浪费时间调度线程而不是解码音频。 - Anand Rathi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接