如何在C语言中实现循环缓冲区?

83

我需要一个固定大小(可在运行时选择,而不是编译时)的循环缓冲区,可以容纳任何类型的对象,并且需要非常高的性能。我认为不会有资源争用问题,因为尽管它在多任务嵌入式环境中,但它是协作的,所以任务本身可以管理。

我的初步想法是在缓冲区中存储一个简单的结构体,其中包含类型(简单的枚举/定义)和指向负载的void指针,但我希望这样做尽可能快,因此我愿意接受绕过堆的建议。

实际上,我很乐意绕过任何标准库以获得原始速度-从我看到的代码来看,它并没有为 CPU 进行大量优化:它看起来只是编译了 C 代码用于像strcpy()之类的操作,没有手工编写的汇编代码。

非常感谢您提供任何代码或想法。所需的操作如下:

  • 创建指定大小的缓冲区。
  • 放置在尾部。
  • 从头获取。
  • 返回计数。
  • 删除缓冲区。

1
你需要一个循环缓冲区还是队列?所需的操作听起来像是队列。我承认,使用循环缓冲区来满足固定大小的要求是有道理的,但我不确定问题标题是否反映了你实际的问题。 - Logan Capaldo
1
如果您认为其他数据结构可以更快,我也愿意尝试,但我相当确定,在内存中固定大小的循环缓冲区将优于队列中项目的malloc/free。虽然我猜我仍然需要对有效负载进行malloc/free:如果我可以为项目和有效负载执行一次malloc,那可能是值得的。 - paxdiablo
如果你认为它们可以更快的话,我建议你进行基准测试。顺便问一下,你将什么定义为“非常高性能”? - Mitch Wheat
1
我将对所有想法进行基准测试(我拥有基于实际吞吐量的测试数据生成能力)。“高性能”将指代任何使当前的 CPU 能够处理客户最近增加的负载的东西 :-) - paxdiablo
2
我来澄清一下。我不需要四路最新英特尔高性能CPU的性能。它正在运行一个8051变体,这并不是最快的,所以我只是在寻找优化想法来测试。如果没有一个可行的方案,客户将不得不基于不同的CPU制作新的硬件,这不会便宜。当前队列中的项目处理已被确定为主要瓶颈。 - paxdiablo
9个回答

94

最简单的解决方案是跟踪项大小和项数,然后创建适当字节数的缓冲区:

typedef struct circular_buffer
{
    void *buffer;     // data buffer
    void *buffer_end; // end of data buffer
    size_t capacity;  // maximum number of items in the buffer
    size_t count;     // number of items in the buffer
    size_t sz;        // size of each item in the buffer
    void *head;       // pointer to head
    void *tail;       // pointer to tail
} circular_buffer;

void cb_init(circular_buffer *cb, size_t capacity, size_t sz)
{
    cb->buffer = malloc(capacity * sz);
    if(cb->buffer == NULL)
        // handle error
    cb->buffer_end = (char *)cb->buffer + capacity * sz;
    cb->capacity = capacity;
    cb->count = 0;
    cb->sz = sz;
    cb->head = cb->buffer;
    cb->tail = cb->buffer;
}

void cb_free(circular_buffer *cb)
{
    free(cb->buffer);
    // clear out other fields too, just to be safe
}

void cb_push_back(circular_buffer *cb, const void *item)
{
    if(cb->count == cb->capacity){
        // handle error
    }
    memcpy(cb->head, item, cb->sz);
    cb->head = (char*)cb->head + cb->sz;
    if(cb->head == cb->buffer_end)
        cb->head = cb->buffer;
    cb->count++;
}

void cb_pop_front(circular_buffer *cb, void *item)
{
    if(cb->count == 0){
        // handle error
    }
    memcpy(item, cb->tail, cb->sz);
    cb->tail = (char*)cb->tail + cb->sz;
    if(cb->tail == cb->buffer_end)
        cb->tail = cb->buffer;
    cb->count--;
}

5
非常标准的解决方案 - 正好符合原帖中提到的 OP 想要避免的规格。:P - Anthony
Adam,这个解决方案假设相同大小的项目必须始终占据缓冲区中的相同位置。我认为OP要求在缓冲区中的任何位置存储的任何给定输入都可以是6种不同大小的数据类型之一。这留下了两个选择。使用calloc()和realloc()为每个新到达的数据分配空间,或者分配足够大以容纳缓冲区中每个位置上最大数据类型的缓冲区。如果可行的话,后一种方法会更快更干净。 - user1899861
请问您能提供一种初始化循环缓冲区的方法吗? 使用以下代码: circular_buffer *cb; cb_init(cb, 20, 4); 我得到了这个错误:Process finished with exit code 139 (interrupted by signal 11: SIGSEGV)。 先感谢您,同时也抱歉在评论中写代码。 - Dimitrios Filippou
1
只是想添加一条注释,这个实现在多线程情况下不起作用,即生产者和消费者是单独的线程,因为两者都访问和修改了“count”。我们需要保护“count”。 - user138645

16
// Note power of two buffer size
#define kNumPointsInMyBuffer 1024 

typedef struct _ringBuffer {
    UInt32 currentIndex;
    UInt32 sizeOfBuffer;
    double data[kNumPointsInMyBuffer];
} ringBuffer;

// Initialize the ring buffer
ringBuffer *myRingBuffer = (ringBuffer *)calloc(1, sizeof(ringBuffer));
myRingBuffer->sizeOfBuffer = kNumPointsInMyBuffer;
myRingBuffer->currentIndex = 0;

// A little function to write into the buffer
// N.B. First argument of writeIntoBuffer() just happens to have the
// same as the one calloc'ed above. It will only point to the same
// space in memory if the calloc'ed pointer is passed to
// writeIntoBuffer() as an arg when the function is called. Consider
// using another name for clarity
void writeIntoBuffer(ringBuffer *myRingBuffer, double *myData, int numsamples) {
    // -1 for our binary modulo in a moment
    int buffLen = myRingBuffer->sizeOfBuffer - 1;
    int lastWrittenSample = myRingBuffer->currentIndex;

    int idx;
    for (int i=0; i < numsamples; ++i) {
        // modulo will automagically wrap around our index
        idx = (i + lastWrittenSample) & buffLen; 
        myRingBuffer->data[idx] = myData[i];
    }

    // Update the current index of our ring buffer.
    myRingBuffer->currentIndex += numsamples;
    myRingBuffer->currentIndex &= myRingBuffer->sizeOfBuffer - 1;
}
只要你的环形缓冲区长度是2的幂次方,那么使用快速的二进制"&"操作可以为您完成索引的循环。对于我的应用程序,我正在从麦克风获取音频,并显示给用户一个音频片段的环形缓冲区。
我总是确保屏幕上可以显示的音频最大量远小于环形缓冲区的大小。否则,您可能会从同一块中读取和写入数据,这可能会导致显示出奇怪的伪像。

我喜欢使用&符号和使用ringBuffer->sizeOfBuffer作为位掩码。我猜“奇怪的显示伪影”问题是由于在写入FIFO之前没有检查是否会覆盖尾部而进行写入造成的? - user1899861
1
我已经进行了一些测试,我认为在这个代码片段中,%和&两种方式是相同的:uint8_t tmp1,tmp2; tmp1 = (34 + 1) & 31; tmp2 = (35 ) % 32; printf("%d %d",tmp1,tmp2);那么,它们之间的实际区别是什么,还是只是编码风格的不同? - R1S8K
@R1S8K 当操作数(32)是2的幂时,&%才能获得相同的结果。 - Matthieu

12

你在编写缓冲区时需要列举所需类型,还是需要能够通过动态调用在运行时添加类型?如果是前者,则我会将缓冲区创建为一个n个结构体的堆分配数组,其中每个结构体由两个元素组成:一个枚举标记,用于标识数据类型,和所有数据类型的联合。虽然在存储小元素方面你会失去额外的存储空间,但可以避免处理分配/释放和导致的内存碎片。然后您只需要跟踪定义缓冲区头和尾元素的开始和结束索引,并在递增/递减索引时确保计算 mod n。


需要的类型是枚举的,是的。它们大约只有六种,而且永远不会改变。但持有这些项的队列必须能够容纳所有六种类型。我不确定在队列中存储整个项目而不是指针 - 这意味着复制项目(几百字节)而不是指针。但我喜欢联合的想法 - 我主要关注的是速度而不是内存(我们有足够的内存,但 CPU 却很差:-)。 - paxdiablo
你的回答给了我一个好主意 - 项目的内存可以从malloc()预先分配并从一个专门处理这些内存块的mymalloc()中分配。而且我仍然可以使用指针。对此点赞。 - paxdiablo
根据对数据的访问模式,您可能需要进行额外的复制,也可能不需要。如果您可以在原地构建项目,并在弹出之前在缓冲区中引用它们,则可能不需要进行任何额外的复制。但是,从您自己的分配器中分发它们并使用指针(或索引)的单独数组作为缓冲区肯定更安全和更灵活。 - dewtell

10
首先,标题。如果您使用位整数来保存头和尾“指针”,并调整它们的大小使它们完全同步,则不需要使用模算术来包装缓冲区。即:将4096填充到12位无符号整数中,它本身就是0,没有任何修改。即使对于2的幂,消除模算术也会将速度提高一倍-几乎完全相同。
在我的第三代i7 Dell XPS 8500上,使用Visual Studio 2010的C ++编译器进行默认内联,填充和排空任何类型数据元素的1000万次迭代需要52秒,并且1/8192的时间用于服务一个数据。
我建议重写main()中的测试循环,使它们不再控制流程-应该由返回值指示缓冲区已满或为空以及相应的break;语句来控制。即:填充器和排水器应该能够互相碰撞而不会损坏或不稳定。希望在某个时候我能够将此代码多线程化,届时这种行为将至关重要。

QUEUE_DESC(队列描述符)和初始化函数强制此代码中的所有缓冲区为2的幂。否则,上述方案将无法正常工作。顺便提一下,注意QUEUE_DESC并非硬编码,它使用一个清单常量(#define BITS_ELE_KNT)进行构建。(我假设2的幂在这里足够灵活)

为了使缓冲区大小可以在运行时选择,我尝试了不同的方法(此处未显示),并决定使用USHRTs作为Head、Tail、EleKnt的数据类型,以管理FIFO缓冲区[USHRT]。为避免模数算术,我创建了一个掩码与Head、Tail相&,但该掩码结果为(EleKnt-1),因此只需使用该掩码即可。使用USHRTS而不是位整数可将性能提高约15%,特别是在繁忙的共享机器上,英特尔CPU核心始终比其总线更快,因此打包数据结构可以让您在其他竞争线程之前加载和执行。权衡。

请注意,缓冲区的实际存储是使用calloc()在堆上分配的,并且指针位于结构体的基础上,因此结构体和指针具有完全相同的地址。即;不需要添加偏移量以将结构体地址与寄存器绑定。

在同样的思路下,所有与缓冲服务相关的变量都与缓冲物理相邻,并绑定到同一个结构体中,因此编译器可以生成漂亮的汇编语言。如果不禁用内联优化,你将无法看到任何汇编代码,因为否则它会被压缩成虚无。
为了支持任何数据类型的多态性,我使用了memcpy()而不是赋值。如果您只需要灵活地支持每次编译一个随机变量类型,那么这段代码就完美地工作了。
对于多态性,您只需要知道类型及其存储要求。DATA_DESC描述符数组提供了一种跟踪放入QUEUE_DESC.pBuffer中的每个数据的方法,以便可以正确检索它们。我只分配足够的pBuffer内存来容纳最大数据类型的所有元素,但要跟踪给定数据使用的存储空间量,在DATA_DESC.dBytes中记录。另一种选择是重新发明堆管理器。

这意味着 QUEUE_DESC 的 UCHAR *pBuffer 将有一个相应的伴随数组用于跟踪数据类型和大小,而数据在 pBuffer 中的存储位置将保持不变。新成员将类似于 DATA_DESC *pDataDesc,或者,如果您能找到一种方法来让编译器接受这样的前向引用,可以是 DATA_DESC DataDesc [2 ^ BITS_ELE_KNT]。 在这些情况下,Calloc()始终更具灵活性。

在 Q_Put()、Q_Get() 中仍然需要使用 memcpy(),但实际复制的字节数取决于 DATA_DESC.dBytes,而不是 QUEUE_DESC.EleBytes。对于任何给定的 put 或 get 操作,元素都可能是不同类型/大小的。

我相信这段代码满足速度和缓冲区大小的要求,并且可以满足 6 种不同数据类型的要求。我留下了多个测试固件,以 printf() 语句的形式,这样您就可以自行确认代码是否正常工作。随机数生成器演示了该代码对于任意随机 head/tail 组合的有效性。

enter code here
// Queue_Small.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include <time.h>
#include <limits.h>
#include <stdlib.h>
#include <malloc.h>
#include <memory.h>
#include <math.h>

#define UCHAR unsigned char
#define ULONG unsigned long
#define USHRT unsigned short
#define dbl   double
/* Queue structure */
#define QUEUE_FULL_FLAG 1
#define QUEUE_EMPTY_FLAG -1
#define QUEUE_OK 0
//  
#define BITS_ELE_KNT    12  //12 bits will create 4.096 elements numbered 0-4095
//
//typedef struct    {
//  USHRT dBytes:8;     //amount of QUEUE_DESC.EleBytes storage used by datatype
//  USHRT dType :3; //supports 8 possible data types (0-7)
//  USHRT dFoo  :5; //unused bits of the unsigned short host's storage
// }    DATA_DESC;
//  This descriptor gives a home to all the housekeeping variables
typedef struct  {
    UCHAR   *pBuffer;   //  pointer to storage, 16 to 4096 elements
    ULONG Tail  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG Head  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG EleBytes  :8;     //  sizeof(elements) with range of 0-256 bytes
    // some unused bits will be left over if BITS_ELE_KNT < 12
    USHRT EleKnt    :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096)
    //USHRT Flags   :(8*sizeof(USHRT) - BITS_ELE_KNT +1);   //  flags you can use
    USHRT   IsFull  :1;     // queue is full
    USHRT   IsEmpty :1;     // queue is empty
    USHRT   Unused  :1;     // 16th bit of USHRT
}   QUEUE_DESC;

//  ---------------------------------------------------------------------------
//  Function prototypes
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz);
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew);
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q);
//  ---------------------------------------------------------------------------
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz)    {
    memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero
    //select buffer size from powers of 2 to receive modulo 
    //                arithmetic benefit of bit uints overflowing
    Q->EleKnt   =   (USHRT)pow(2.0, BitsForEleKnt);
    Q->EleBytes =   DataTypeSz; // how much storage for each element?
    //  Randomly generated head, tail a test fixture only. 
    //      Demonstrates that the queue can be entered at a random point 
    //      and still perform properly. Normally zero
    srand(unsigned(time(NULL)));    // seed random number generator with current time
    Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset
    Q->Head = Q->Tail = 0;
    //  allocate queue's storage
    if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes)))  {
        return NULL;
    }   else    {
        return Q;
    }
}
//  ---------------------------------------------------------------------------
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew)   
{
    memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes);
    if(Q->Tail == (Q->Head + Q->EleKnt)) {
        //  Q->IsFull = 1;
        Q->Tail += 1;   
        return QUEUE_FULL_FLAG; //  queue is full
    }
    Q->Tail += 1;   //  the unsigned bit int MUST wrap around, just like modulo
    return QUEUE_OK; // No errors
}
//  ---------------------------------------------------------------------------
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q)   
{
    memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes);
    Q->Head += 1;   //  the bit int MUST wrap around, just like modulo

    if(Q->Head == Q->Tail)      {
        //  Q->IsEmpty = 1;
        return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get
    }
    return QUEUE_OK; // No errors
}
//
//  ---------------------------------------------------------------------------
int _tmain(int argc, _TCHAR* argv[])    {
//  constrain buffer size to some power of 2 to force faux modulo arithmetic
    int LoopKnt = 1000000;  //  for benchmarking purposes only
    int k, i=0, Qview=0;
    time_t start;
    QUEUE_DESC Queue, *Q;
    if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) {
        printf("\nProgram failed to initialize. Aborting.\n\n");
        return 0;
    }

    start = clock();
    for(k=0; k<LoopKnt; k++)    {
        //printf("\n\n Fill'er up please...\n");
        //Q->Head = Q->Tail = rand();
        for(i=1; i<= Q->EleKnt; i++)    {
            Qview = i*i;
            if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview))    {
                //printf("\nQueue is full at %i \n", i);
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //  Get data from queue until completely drained (empty)
        //
        //printf("\n\n Step into the lab, and see what's on the slab... \n");
        Qview = 0;
        for(i=1; i; i++)    {
            if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q))   {
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                //printf("\nQueue is empty at %i", i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    }
    printf("\nQueue time was %5.3f to fill & drain %i element queue  %i times \n", 
                     (dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt);
    printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    getchar();
    return 0;
}

7
您的代码出现了多个问题。首先,在您的 "Q_Put" 方法中比较 Q->Tail == (Q->Head + Q->EleKnt) 永远不会返回 true,因为 Q->Head + Q->EleKnt 不是模加法,这意味着下一次写操作将简单地覆盖头部。如果 EleKnt4096,那么您的 Tail 将永远无法到达该值。其次,如果将其用作生产者/消费者队列,将会造成严重破坏,因为您的 Q_Put "先写后问",即使在意识到队列已满之后仍会更新 Tail。下一次调用 Q_Put 将会像什么都没有发生一样覆盖头部。 - vgru
2
你应该分析维基百科页面上关于循环缓冲区的各种算法,即满或空缓冲区区别问题。使用你目前的方法,你需要在缓冲区中保留一个元素以了解满和空之间的差异。 - vgru
5
@RocketRoy: 我刚才用的是Visual Studio 2012。你可以在线查看结果(标准输出在页面底部),这样我们就能确保看的是同一段代码。测试程序的输出在页面底部,并确认了我之前所写的内容:只要不试图填满它,它就“完美地工作”。这也说明了它作为FIFO缓冲区毫无用处。 :) - vgru
5
具有讽刺意味的是,我通过你在这个回答中的评论来到了这篇文章,你声称@AdamDavis的代码片段无法工作,而实际情况恰恰相反。请注意Adam一旦检测到“Put”已经满了就立即返回,而你却仍然复制数据然后再进行检查。 - vgru
4
@RocketRoy: 所以你真的告诉我你仍然不同意你的代码有问题吗?是的,我非常确定你的代码不会“移动间隙”,因为它只是完美地工作并覆盖头部,没有办法检测满时。我希望你不要在任何生命关切的系统中使用它,否则你可能会遇到严重麻烦。 - vgru
显示剩余2条评论

9

这里是一份用C语言编写的简单解决方案。假设每个函数中断都已关闭。 没有多态和其他复杂的东西,只有常识。


#define BUFSIZE 128
char buf[BUFSIZE];
char *pIn, *pOut, *pEnd;
char full;

// init
void buf_init()
{
    pIn = pOut = buf;       // init to any slot in buffer
    pEnd = &buf[BUFSIZE];   // past last valid slot in buffer
    full = 0;               // buffer is empty
}

// add char 'c' to buffer
int buf_put(char c)
{
    if (pIn == pOut  &&  full)
        return 0;           // buffer overrun

    *pIn++ = c;             // insert c into buffer
    if (pIn >= pEnd)        // end of circular buffer?
        pIn = buf;          // wrap around

    if (pIn == pOut)        // did we run into the output ptr?
        full = 1;           // can't add any more data into buffer
    return 1;               // all OK
}

// get a char from circular buffer
int buf_get(char *pc)
{
    if (pIn == pOut  &&  !full)
        return 0;           // buffer empty  FAIL

    *pc = *pOut++;              // pick up next char to be returned
    if (pOut >= pEnd)       // end of circular buffer?
        pOut = buf;         // wrap around

    full = 0;               // there is at least 1 slot
    return 1;               // *pc has the data to be returned
}

1
应该使用 pIn > pEnd 而不是 pIn >= pEnd,否则您将永远无法填充 buf 的最后一个插槽;对于 pOut >= pEnd 也是如此。 - Dmitry Kurilo

2

一个C风格的简单整数环形缓冲区。先使用init初始化,然后使用put和get来操作。如果缓冲区不包含任何数据,它将返回“0”零。

//=====================================
// ring buffer address based
//=====================================
#define cRingBufCount   512
int     sRingBuf[cRingBufCount];    // Ring Buffer
int     sRingBufPut;                // Input index address
int     sRingBufGet;                // Output index address
Bool    sRingOverWrite;

void    GetRingBufCount(void)
{
int     r;
`       r= sRingBufPut - sRingBufGet;
        if ( r < cRingBufCount ) r+= cRingBufCount;
        return r; 
}

void    InitRingBuffer(void)
{
        sRingBufPut= 0;
        sRingBufGet= 0;
}       

void    PutRingBuffer(int d)
{
        sRingBuffer[sRingBufPut]= d;
        if (sRingBufPut==sRingBufGet)// both address are like ziro
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            sRingBufGet= IncRingBufferPointer(sRingBufGet);
        }
        else //Put over write a data
        {
            sRingBufPut= IncRingBufferPointer(sRingBufPut);
            if (sRingBufPut==sRingBufGet)
            {
                sRingOverWrite= Ture;
                sRingBufGet= IncRingBufferPointer(sRingBufGet);
            }
        }
}

int     GetRingBuffer(void)
{
int     r;
        if (sRingBufGet==sRingBufPut) return 0;
        r= sRingBuf[sRingBufGet];
        sRingBufGet= IncRingBufferPointer(sRingBufGet);
        sRingOverWrite=False;
        return r;
}

int     IncRingBufferPointer(int a)
{
        a+= 1;
        if (a>= cRingBufCount) a= 0;
        return a;
}

2
一个简单的实现可以包括:
  • 一个缓冲区,作为大小为n的数组,类型可根据需要而定
  • 一个读指针或索引(取决于您的处理器哪个更有效率)
  • 一个写指针或索引
  • 一个计数器,指示缓冲区中有多少数据(可以从读和写指针派生,但单独跟踪速度更快)
每次写入数据时,您都会推进写指针并增加计数器。读取数据时,您会增加读指针并减少计数器。如果任一指针达到n,则将其设置为零。
如果计数器=n,则无法写入。如果计数器=0,则无法读取。

当读指针和写指针都指向同一位置时,如何从指针中推导出计数器?在这种情况下,缓冲区可能为空或已满,并且需要一个计数器(或存储缓冲区是否已满的标志)。 - dedoussis
@DimitriosDedoussis:可以采用这些建议之一,或者您可以说如果指针指向相同位置,则缓冲区为空,如果写指针指向读指针之前的位置,则缓冲区已满(并且不允许写指针前进以匹配读指针)。 - Steve Melnikoff
没错。在这种情况下,缓冲区的长度必须为n+1,以允许容量为n。我想要强调的是,除非在缓冲机制中进行一些变通(如增加缓冲区的长度)或添加一个有状态的布尔标志,否则计数器无法从指针中派生。 - dedoussis

1

@Adam Rosenfield的solution虽然正确,但可以使用更轻量级的circular_buffer结构来实现,而不涉及countcapacity

该结构只能保存以下4个指针:

  • buffer:指向内存中缓冲区的起始位置。
  • buffer_end:指向内存中缓冲区的末尾位置。
  • head:指向存储数据的末尾。
  • tail:指向存储数据的起始位置。

我们可以保留sz属性,以允许存储单位的参数化。

以上指针可以推导出countcapacity的值。

容量

capacity很简单,它可以通过将buffer_end指针和buffer指针之间的距离除以存储单位sz来推导出(下面的代码片段是伪代码):

capacity = (buffer_end - buffer) / sz

计数

对于计数,情况会变得有些复杂。例如,在headtail指向同一位置的情况下,无法确定缓冲区是空还是满。

为了解决这个问题,缓冲区应该为额外的元素分配内存。例如,如果我们循环缓冲区的期望容量是10 * sz,那么我们需要分配11 * sz

容量公式将变为(以下代码片段是伪代码):

capacity_bytes = buffer_end - buffer - sz
capacity = capacity_bytes / sz

这个额外的语义元素使我们能够构建条件,评估缓冲区是否为空或已满。
空状态条件
为了使缓冲区为空,head指针必须指向与tail指针相同的位置:
head == tail

如果上述条件为真,则缓冲区为空。

完整状态条件

为了使缓冲区变满,head指针应该在tail指针的后面1个元素。因此,为了从head位置跳到tail位置所需覆盖的空间应等于1 * sz

如果tail大于head:

tail - head == sz

如果上述条件为真,则缓冲区已满。
如果head大于tail:
  1. buffer_end - head 返回从头部跳到缓冲区末尾的空间。
  2. tail - buffer 返回从缓冲区开头跳到尾部所需的空间。
  3. 将上述两个空间相加应等于从头部跳到尾部所需的空间。
  4. 步骤3中得出的空间不应超过1 * sz
(buffer_end - head) + (tail - buffer) == sz
=> buffer_end - buffer - head + tail == sz
=> buffer_end - buffer - sz == head - tail
=> head - tail == buffer_end - buffer - sz
=> head - tail == capacity_bytes

如果上述条件为真,则缓冲区已满。

实践中

修改 @Adam Rosenfield 的代码,使用上述的 circular_buffer 结构:

#include <string.h>

#define CB_SUCCESS 0        /* CB operation was successful */
#define CB_MEMORY_ERROR 1   /* Failed to allocate memory */
#define CB_OVERFLOW_ERROR 2 /* CB is full. Cannot push more items. */
#define CB_EMPTY_ERROR 3    /* CB is empty. Cannot pop more items. */

typedef struct circular_buffer {
  void *buffer;
  void *buffer_end;
  size_t sz;
  void *head;
  void *tail;
} circular_buffer;

int cb_init(circular_buffer *cb, size_t capacity, size_t sz) {
  const int incremented_capacity = capacity + 1; // Add extra element to evaluate count
  cb->buffer = malloc(incremented_capacity * sz);
  if (cb->buffer == NULL)
    return CB_MEMORY_ERROR;
  cb->buffer_end = (char *)cb->buffer + incremented_capacity * sz;
  cb->sz = sz;
  cb->head = cb->buffer;
  cb->tail = cb->buffer;
  return CB_SUCCESS;
}

int cb_free(circular_buffer *cb) {
  free(cb->buffer);
  return CB_SUCCESS;
}

const int _cb_length(circular_buffer *cb) {
  return (char *)cb->buffer_end - (char *)cb->buffer;
}

int cb_push_back(circular_buffer *cb, const void *item) {
  const int buffer_length = _cb_length(cb);
  const int capacity_length = buffer_length - cb->sz;

  if ((char *)cb->tail - (char *)cb->head == cb->sz ||
      (char *)cb->head - (char *)cb->tail == capacity_length)
    return CB_OVERFLOW_ERROR;

  memcpy(cb->head, item, cb->sz);

  cb->head = (char*)cb->head + cb->sz;
  if(cb->head == cb->buffer_end)
    cb->head = cb->buffer;

  return CB_SUCCESS;
}

int cb_pop_front(circular_buffer *cb, void *item) {
  if (cb->head == cb->tail)
    return CB_EMPTY_ERROR;

  memcpy(item, cb->tail, cb->sz);

  cb->tail = (char*)cb->tail + cb->sz;
  if(cb->tail == cb->buffer_end)
    cb->tail = cb->buffer;

  return CB_SUCCESS;
}


0
扩展adam-rosenfield的解决方案,我认为以下内容适用于多线程单生产者-单消费者场景。
int cb_push_back(circular_buffer *cb, const void *item)
{
  void *new_head = (char *)cb->head + cb->sz;
  if (new_head == cb>buffer_end) {
      new_head = cb->buffer;
  }
  if (new_head == cb->tail) {
    return 1;
  }
  memcpy(cb->head, item, cb->sz);
  cb->head = new_head;
  return 0;
}

int cb_pop_front(circular_buffer *cb, void *item)
{
  void *new_tail = cb->tail + cb->sz;
  if (cb->head == cb->tail) {
    return 1;
  }
  memcpy(item, cb->tail, cb->sz);
  if (new_tail == cb->buffer_end) {
    new_tail = cb->buffer;
  }
  cb->tail = new_tail;
  return 0;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接