我想分享一下我解决这个问题的经验。我能感受到你的痛苦和困惑,但实际上,如果你知道该做什么,考虑到你拥有的各种选项,实现一个工作解决方案并不难。
目标
实现一个缓冲池,可执行两个操作——获取和释放。
基本池策略:
- 获取从池中提取一个缓冲区,有效地减少可用缓冲区的数量1;
- 如果没有可用的缓冲区,则出现两个选项:
- 增加池并返回新创建的缓冲区;或
- 创建并返回一个虚拟缓冲区(下面会解释)。
- 释放将缓冲区返回给池。
池可以是固定大小或可变大小。“可变”意味着最初有M个预分配的缓冲区(例如为零),并且在需要时池可以增长到N。 “固定”意味着所有缓冲区都在池创建时预分配(M = N)。
实现一个回调函数,为libuv获取缓冲区。
不要允许无限制的池增长,除了内存不足的情况外,在任何情况下仍要使池正常运行。
实现
现在,让我们更详细地介绍所有这些内容。
池结构:
typedef struct bufpool_s bufpool_t;
struct bufpool_s {
void *bufs[BUFPOOL_CAPACITY];
int size;
};
size
是当前池大小。
缓冲区本身是一个带有以下结构前缀的内存块:
#define bufbase(ptr) ((bufbase_t *)((char *)(ptr) - sizeof(bufbase_t)))
#define buflen(ptr) (bufbase(ptr)->len)
typedef struct bufbase_s bufbase_t;
struct bufbase_s {
bufpool_t *pool;
int len;
};
len
是缓冲区的字节长度。
分配新缓冲区的方式如下:
void *bufpool_alloc(bufpool_t *pool, int len) {
bufbase_t *base = malloc(sizeof(bufbase_t) + len);
if (!base) return 0;
base->pool = pool;
base->len = len;
return (char *)base + sizeof(bufbase_t);
}
注意,返回的指针指向头部后面的下一个字节 - 数据区域。这允许将缓冲区指针视为通过标准调用 malloc
分配。
释放则相反:
void bufpool_free(void *ptr) {
if (!ptr) return;
free(bufbase(ptr));
}
libuv的分配回调函数长这样:
void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
int len;
void *ptr = bufpool_acquire(handle->loop->data, &len);
*buf = uv_buf_init(ptr, len);
}
您可以在这里看到,alloc_cb
从循环的用户数据指针中获取一个缓冲池的指针。这意味着,在使用之前,应将缓冲池附加到事件循环上。换句话说,您应该在创建循环时初始化池并将其指针分配给data
字段。如果您已经在该字段中持有其他用户数据,请扩展您的结构。
一个“虚拟缓冲区”是一个假缓冲区,这意味着它不是来自池,但仍然是完全有效的。虚拟缓冲区的目的是在稀有情况下保持整个系统工作,即当所有缓冲区都被获取且需要另一个缓冲区时。根据我的研究,在所有现代操作系统上,关于8KB的小内存块的分配非常快速 - 这非常适合虚拟缓冲区的大小。
#define DUMMY_BUF_SIZE 8000
void *bufpool_dummy() {
return bufpool_alloc(0, DUMMY_BUF_SIZE);
}
acquire 操作:
void *bufpool_acquire(bufpool_t *pool, int *len) {
void *buf = bufpool_dequeue(pool);
if (!buf) buf = bufpool_dummy();
*len = buf ? buflen(buf) : 0;
return buf;
}
发布操作:
void bufpool_release(void *ptr) {
bufbase_t *base;
if (!ptr) return;
base = bufbase(ptr);
if (base->pool) bufpool_enqueue(base->pool, ptr);
else free(base);
}
这里有两个函数 -
bufpool_enqueue
和
bufpool_dequeue
。基本上,它们执行池的所有工作。
在我的情况下,一个O(1)队列的缓冲索引位于上述内容之上,允许我更高效地跟踪池的状态,非常快地获取缓冲区的索引。不必像我这样极端操作,因为池的最大尺寸是有限制的,因此任何数组搜索也将在常数时间内完成。
在最简单的情况下,你可以通过在
bufpool_s
结构体中的
bufs
数组中进行纯线性搜索来实现这些函数。例如,如果获取一个缓冲区,就搜索第一个非空位置,保存指针并将NULL放入该位置。下次释放缓冲区时,搜索第一个NULL位置并将其指针保存在那里。
池的内部如下:
#define BUF_SIZE 64000
void *bufpool_grow(bufpool_t *pool) {
int idx = pool->size;
void *buf;
if (idx == BUFPOOL_CAPACITY) return 0;
buf = bufpool_alloc(pool, BUF_SIZE);
if (!buf) return 0;
pool->bufs[idx] = 0;
pool->size = idx + 1;
return buf;
}
void bufpool_enqueue(bufpool_t *pool, void *ptr) {
int idx;
for (idx = 0; idx < pool->size; ++idx) {
if (!pool->bufs[idx]) break;
}
assert(idx < pool->size);
pool->bufs[idx] = ptr;
}
void *bufpool_dequeue(bufpool_t *pool) {
int idx;
void *ptr;
for (idx = 0; idx < pool->size; ++idx) {
ptr = pool->bufs[idx];
if (ptr) {
pool->bufs[idx] = 0;
return ptr;
}
}
return bufpool_grow(pool);
}
普通缓冲区大小为64000字节,因为我希望它能够舒适地适应具有标头的64Kb块。
最后,初始化和去初始化例程:
void bufpool_init(bufpool_t *pool) {
pool->size = 0;
}
void bufpool_done(bufpool_t *pool) {
int idx;
for (idx = 0; idx < pool->size; ++idx) bufpool_free(pool->bufs[idx]);
}
请注意,此实现仅为说明目的而简化。在真实世界的情况下,将很可能需要缩小池策略。
使用
现在,您应该能够编写自己的libuv回调函数了:
void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
/* ... */
bufpool_release(buf->base); /* Release the buffer */
}
循环初始化:
uv_loop_t *loop = malloc(sizeof(*loop));
bufpool_t *pool = malloc(sizeof(*pool));
uv_loop_init(loop);
bufpool_init(pool);
loop->data = pool;
操作:
uv_tcp_t *tcp = malloc(sizeof(*tcp));
uv_tcp_init(tcp);
/* ... */
uv_read_start((uv_handle_t *)tcp, alloc_cb, read_cb);
更新 (2016年8月2日)
当获取缓冲区时,根据所请求的大小使用自适应策略,并仅在请求大块数据(例如所有读取和长写入)时返回池化缓冲区。对于其他情况(例如大部分写操作),返回虚拟缓冲区。这将有助于避免浪费池化缓冲区,同时保持可接受的分配速度。例如:
void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
int len = size;
void *ptr = bufpool_acquire(handle->loop->data, &len);
*buf = uv_buf_init(ptr, len);
}
void *bufpool_acquire(bufpool_t *pool, int *len) {
int size = *len;
if (size > DUMMY_BUF_SIZE) {
buf = bufpool_dequeue(pool);
if (buf) {
if (size > BUF_SIZE) *len = BUF_SIZE;
return buf;
}
size = DUMMY_BUF_SIZE;
}
buf = bufpool_alloc(0, size);
*len = buf ? size : 0;
return buf;
}
使用此代码片段时,不需要buflen
和bufpool_dummy
。