libuv 分配内存缓冲区的重用技巧

10
我正在使用libuv来开发一个涉及大量网络交互的应用程序,我关注的是在libuv回调延迟执行的情况下,哪些重复使用已分配内存的技术既高效又安全。
在与libuv用户进行基本层交互时,需要指定缓冲区分配回调函数并设置读取处理程序。
UV_EXTERN int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);

uv_alloc_cb是指

typedef void (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);

但是问题在于:每次通过handle传入新消息时(比如,每次接收到uv_udp_t handle的UDP数据报),都会调用内存分配回调函数,并且直接为每个传入的UDP数据报分配新缓冲区似乎非常不节省内存。

因此,我希望了解一些通用的C技术(可能在由libuv回调系统介绍的延迟执行上下文中),以实现在可能时重复使用相同的分配内存。

另外,如果可能的话,我想保持windows可移植性。

注:

  • 我知道这个问题:Does libuv provide any facilities to attach a buffer to a connection and re use it;除了说明静态分配的缓冲区不行之外,它并没有回答如何正确地使用libuv进行内存分配。特别是,它没有涵盖将缓冲区附加到处理程序的安全性方面(无论是通过包装器结构还是通过handle->data上下文)(在同一缓冲区上进行延迟写回调时,可能与libuv主循环的多个迭代中的另一个读回调调用重叠)。
  • 阅读http://nikhilm.github.io/uvbook/filesystem.html,我注意到在片段uvtee/main.c - Write to pipe下面有以下短语:

    我们进行了复制,以便可以独立于彼此释放两个调用写入数据的缓冲区。虽然对于像这样的演示程序而言是可以接受的,但在任何大型应用程序中,您可能需要更智能的内存管理,如引用计数缓冲区或缓冲区池。

    但是我无法找到涉及libuv缓冲区的引用计数解决方案(如何进行适当的处理?),也没有明确的缓冲区池示例在libuv环境中(是否有相关的库?)。

3个回答

22

我想分享一下我解决这个问题的经验。我能感受到你的痛苦和困惑,但实际上,如果你知道该做什么,考虑到你拥有的各种选项,实现一个工作解决方案并不难。

目标

  1. 实现一个缓冲池,可执行两个操作——获取释放

  2. 基本池策略:

    • 获取从池中提取一个缓冲区,有效地减少可用缓冲区的数量1;
    • 如果没有可用的缓冲区,则出现两个选项:
      • 增加池并返回新创建的缓冲区;或
      • 创建并返回一个虚拟缓冲区(下面会解释)。
    • 释放将缓冲区返回给池。
  3. 池可以是固定大小或可变大小。“可变”意味着最初有M个预分配的缓冲区(例如为零),并且在需要时池可以增长到N。 “固定”意味着所有缓冲区都在池创建时预分配(M = N)。

  4. 实现一个回调函数,为libuv获取缓冲区。

  5. 不要允许无限制的池增长,除了内存不足的情况外,在任何情况下仍要使池正常运行。

实现

现在,让我们更详细地介绍所有这些内容。

池结构:

#define BUFPOOL_CAPACITY 100

typedef struct bufpool_s bufpool_t;

struct bufpool_s {
    void *bufs[BUFPOOL_CAPACITY];
    int size;
};

size 是当前池大小。

缓冲区本身是一个带有以下结构前缀的内存块:

#define bufbase(ptr) ((bufbase_t *)((char *)(ptr) - sizeof(bufbase_t)))
#define buflen(ptr) (bufbase(ptr)->len)

typedef struct bufbase_s bufbase_t;

struct bufbase_s {
    bufpool_t *pool;
    int len;
};

len是缓冲区的字节长度。

分配新缓冲区的方式如下:

void *bufpool_alloc(bufpool_t *pool, int len) {
    bufbase_t *base = malloc(sizeof(bufbase_t) + len);
    if (!base) return 0;
    base->pool = pool;
    base->len = len;
    return (char *)base + sizeof(bufbase_t);
}

注意,返回的指针指向头部后面的下一个字节 - 数据区域。这允许将缓冲区指针视为通过标准调用 malloc 分配。

释放则相反:

void bufpool_free(void *ptr) {
    if (!ptr) return;
    free(bufbase(ptr));
}

libuv的分配回调函数长这样:

void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
    int len;
    void *ptr = bufpool_acquire(handle->loop->data, &len);
    *buf = uv_buf_init(ptr, len);
}

您可以在这里看到,alloc_cb 从循环的用户数据指针中获取一个缓冲池的指针。这意味着,在使用之前,应将缓冲池附加到事件循环上。换句话说,您应该在创建循环时初始化池并将其指针分配给data字段。如果您已经在该字段中持有其他用户数据,请扩展您的结构。

一个“虚拟缓冲区”是一个假缓冲区,这意味着它不是来自池,但仍然是完全有效的。虚拟缓冲区的目的是在稀有情况下保持整个系统工作,即当所有缓冲区都被获取且需要另一个缓冲区时。根据我的研究,在所有现代操作系统上,关于8KB的小内存块的分配非常快速 - 这非常适合虚拟缓冲区的大小。

#define DUMMY_BUF_SIZE 8000

void *bufpool_dummy() {
    return bufpool_alloc(0, DUMMY_BUF_SIZE);
}

acquire 操作:

void *bufpool_acquire(bufpool_t *pool, int *len) {
    void *buf = bufpool_dequeue(pool);
    if (!buf) buf = bufpool_dummy();
    *len = buf ? buflen(buf) : 0;
    return buf;
}

发布操作:

void bufpool_release(void *ptr) {
    bufbase_t *base;
    if (!ptr) return;
    base = bufbase(ptr);
    if (base->pool) bufpool_enqueue(base->pool, ptr);
    else free(base);
}
这里有两个函数 - bufpool_enqueuebufpool_dequeue。基本上,它们执行池的所有工作。
在我的情况下,一个O(1)队列的缓冲索引位于上述内容之上,允许我更高效地跟踪池的状态,非常快地获取缓冲区的索引。不必像我这样极端操作,因为池的最大尺寸是有限制的,因此任何数组搜索也将在常数时间内完成。
在最简单的情况下,你可以通过在bufpool_s 结构体中的bufs数组中进行纯线性搜索来实现这些函数。例如,如果获取一个缓冲区,就搜索第一个非空位置,保存指针并将NULL放入该位置。下次释放缓冲区时,搜索第一个NULL位置并将其指针保存在那里。
池的内部如下:
#define BUF_SIZE 64000

void *bufpool_grow(bufpool_t *pool) {
    int idx = pool->size;
    void *buf;
    if (idx == BUFPOOL_CAPACITY) return 0;
    buf = bufpool_alloc(pool, BUF_SIZE);
    if (!buf) return 0;
    pool->bufs[idx] = 0;
    pool->size = idx + 1;
    return buf;
}

void bufpool_enqueue(bufpool_t *pool, void *ptr) {
    int idx;
    for (idx = 0; idx < pool->size; ++idx) {
        if (!pool->bufs[idx]) break;
    }
    assert(idx < pool->size);
    pool->bufs[idx] = ptr;
}

void *bufpool_dequeue(bufpool_t *pool) {
    int idx;
    void *ptr;
    for (idx = 0; idx < pool->size; ++idx) {
        ptr = pool->bufs[idx];
        if (ptr) {
            pool->bufs[idx] = 0;
            return ptr;
        }
    }
    return bufpool_grow(pool);
}

普通缓冲区大小为64000字节,因为我希望它能够舒适地适应具有标头的64Kb块。

最后,初始化和去初始化例程:

void bufpool_init(bufpool_t *pool) {
    pool->size = 0;
}

void bufpool_done(bufpool_t *pool) {
    int idx;
    for (idx = 0; idx < pool->size; ++idx) bufpool_free(pool->bufs[idx]);
}
请注意,此实现仅为说明目的而简化。在真实世界的情况下,将很可能需要缩小池策略。
使用
现在,您应该能够编写自己的libuv回调函数了:
void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
    /* ... */
    bufpool_release(buf->base); /* Release the buffer */
}

循环初始化:

uv_loop_t *loop = malloc(sizeof(*loop));
bufpool_t *pool = malloc(sizeof(*pool));
uv_loop_init(loop);
bufpool_init(pool);
loop->data = pool;

操作:

uv_tcp_t *tcp = malloc(sizeof(*tcp));
uv_tcp_init(tcp);
/* ... */
uv_read_start((uv_handle_t *)tcp, alloc_cb, read_cb);

更新 (2016年8月2日)

当获取缓冲区时,根据所请求的大小使用自适应策略,并仅在请求大块数据(例如所有读取和长写入)时返回池化缓冲区。对于其他情况(例如大部分写操作),返回虚拟缓冲区。这将有助于避免浪费池化缓冲区,同时保持可接受的分配速度。例如:

void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
    int len = size; /* Requested buffer size */
    void *ptr = bufpool_acquire(handle->loop->data, &len);
    *buf = uv_buf_init(ptr, len);
}

void *bufpool_acquire(bufpool_t *pool, int *len) {
    int size = *len;
    if (size > DUMMY_BUF_SIZE) {
        buf = bufpool_dequeue(pool);
        if (buf) {
            if (size > BUF_SIZE) *len = BUF_SIZE;
            return buf;
        }
        size = DUMMY_BUF_SIZE;
    }
    buf = bufpool_alloc(0, size);
    *len = buf ? size : 0;
    return buf;
}

使用此代码片段时,不需要buflenbufpool_dummy


你是否愿意使用MIT/BSD或LGPL v3许可证发布这段代码? - Carlos Melo
1
许可证噩梦再次袭来... =) 我相信 SO 上的所有内容默认都是根据知识共享许可协议发布的。我不确定如何将我的这个巧妙作品进一步发布为 MIT/BSD 许可证,但我不介意。 - neoxic
是的,这个许可证问题真是个噩梦,再加上我的多疑只会让事情变得更糟。:) 谢谢。 - Carlos Melo
@edwinc 由于它们的容量有限,它们不会“非常慢”。它们本质上是O(1)。此外,文章中还有关于这个问题的特别说明。 - neoxic
假设一个事件循环正在服务于 1000 个套接字。前面的 900 个缓冲区被分配到了服务手机流量的套接字上,并且不能及时返回。循环可能会迭代数百次才能获得新的缓冲区。即使是 1 微秒的延迟,对于每秒处理 1 百万条消息的高端服务器而言,也可能会额外耗费 1 秒。我想,在某些方面,“慢”是相对的。在金融行业中,这就会变慢,但对于类似 Web 浏览器的应用程序来说,这可能还好。链表需要修改两个指针来进行队列排队或者出队,无论缓冲区有多少个。 - edwinc
显示剩余4条评论

1
如果您使用的是Linux,那么您很幸运。Linux内核通常默认使用所谓的SLAB Allocator。这种分配器的优点在于通过维护可回收块的池来减少实际的内存分配。对于您来说,这意味着只要您始终分配相同大小的缓冲区(理想情况下为PAGE_SIZE的pow2大小),则可以在Linux上使用malloc()
如果您不在Linux(或FreeBSD或Solaris)上,或者您正在开发跨平台应用程序,则可以考虑使用glib及其Memory Slices,它们是SLAB分配器的跨平台实现。它在支持它的平台上使用本地实现,因此在Linux上使用它不会带来任何优势(我自己进行了一些测试)。我相信还有其他库可以做到这一点,或者您可以自行实现。

如果你在Linux上测量默认的malloc()在大于16Kb的大块内存上的性能,你会发现它比较小尺寸的内存分配要慢得多。另一方面,libuv默认需要一个大小为64Kb的缓冲区。因此,如果你关心细粒度的性能,使用默认的malloc()分配这样一个大缓冲区并不过分奢侈。 - neoxic
@neoxic 我需要使用更大的块大小运行一些测试,但是一般的想法是,如果您使用 malloc() 然后 free(),然后再次使用相同的块大小进行 malloc(),第二次应该会快得多。 - dtoux
1
请这样做。这正是我所做的,但我感到非常失望。后来我发现标准的malloc()会单独处理/存储小尺寸和大尺寸。 - neoxic

0

让我们重复回调函数的签名:

void alloc_cb(uv_handle_t* handle, size_t, uv_buf_t*);

我将handle->data设置为指向一个结构体/对/元组,例如:

auto t(std::make_tuple(blah1, blah2, blah3));

这使我能够与cb共享任意数据。 我所做的是将结构/对/元组数据成员之一设置为我的缓冲区:

char data[65536];

然后我只需在cb中使用缓冲区:

extern "C"
inline void uv_alloc_cb(uv_handle_t* const uvh, std::size_t const sz,
  uv_buf_t* const buf) noexcept
{
  auto const p(static_cast<std::pair<void*, char*>*>(uvh->data));

  buf->base = std::get<1>(*p);
  buf->len = 65536;
}

这是超级快的,不需要动态分配。我认为libuv API有点临时性,没有经过深思熟虑,实现也不够完善。为什么要求这个任意的64k缓冲区?如果我不提供64k,libuv就会非常不高兴,尽管它不会崩溃。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接