Skip to content

Commit

Permalink
integrate with masterpool
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Aug 3, 2024
1 parent 7e3c246 commit 496706b
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 75 deletions.
228 changes: 157 additions & 71 deletions ww/buffer_pool.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#include "buffer_pool.h"
#include "hplatform.h"
#ifdef OS_LINUX
#include <malloc.h>
#endif
#ifdef DEBUG
#include "loggers/network_logger.h"
#endif
Expand All @@ -19,95 +16,163 @@
#define MEMORY_PROFILE_SMALL (RAM_PROFILE >= kRamProfileM1Memory ? kRamProfileM1Memory : RAM_PROFILE)
#define MEMORY_PROFILE_SELECTED RAM_PROFILE

#define BASE_READ_BUFSIZE (1U << 13) // 8k
#define SMALL_BUFSIZE 1500
#define BUFFERPOOL_SMALL_CONTAINER_LEN ((unsigned long) ((MEMORY_PROFILE_SMALL)))
#define BUFFERPOOL_CONTAINER_LEN ((unsigned long) ((MEMORY_PROFILE_SELECTED)))

#define BUFFER_SIZE_MORE \
(((int) (MEMORY_PROFILE_SELECTED / 16)) > 1 ? (((int) (MEMORY_PROFILE_SELECTED / 16)) - 1) : (0))

#define BUFFER_SIZE (RAM_PROFILE >= kRamProfileS2Memory ? (1U << 15) : (1U << 12)) // 32k (same as nginx file streaming)

// #define BUFFER_SIZE (BASE_READ_BUFSIZE + (BASE_READ_BUFSIZE * BUFFER_SIZE_MORE)) // [8k,32k]

// #define BUFFER_SIZE (((int) (MEMORY_PROFILE_SELECTED / 16)) >= 1 ? (1U << 15) : (1U << 12) ) // [4k,32k]

#define BUFFER_SIZE_SMALL (1U << 12) // 4k

// NOLINTEND

struct buffer_pool_s
{
unsigned int len;

uint16_t cap;
uint16_t free_threshould;
uint8_t tid;
unsigned int buffers_size;
unsigned int large_buffers_len;
unsigned int small_buffers_len;
unsigned int large_buffers_size;
unsigned int small_buffers_size;
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
atomic_size_t in_use;
#endif
shift_buffer_t *available[];
master_pool_t *large_buffers_mp;
shift_buffer_t **large_buffers;
master_pool_t *small_buffers_mp;
shift_buffer_t **small_buffers;
uint8_t tid;
};

static void firstCharge(buffer_pool_t *pool)
// NOLINTEND
static inline bool isLargeBuffer(buffer_pool_t *pool, shift_buffer_t *buf)
{
for (size_t i = 0; i < (pool->cap / 2); i++)
{
pool->available[i] = newShiftBuffer(pool->tid, pool->buffers_size);
}
pool->len = pool->cap / 2;
return bufCap(buf) >= pool->large_buffers_size ? true : false;
}

static void reCharge(buffer_pool_t *pool)
static master_pool_item_t *createLargeBufHandle(struct master_pool_s *pool, void *userdata)
{
const size_t increase = min((pool->cap - pool->len), pool->cap / 2);
(void) pool;
buffer_pool_t *bpool = userdata;
return newShiftBuffer(bpool->tid, bpool->large_buffers_size);
}

for (size_t i = pool->len; i < (pool->len + increase); i++)
{
pool->available[i] = newShiftBuffer(pool->tid, pool->buffers_size);
}
pool->len += increase;
static master_pool_item_t *createSmallBufHandle(struct master_pool_s *pool, void *userdata)
{
(void) pool;
buffer_pool_t *bpool = userdata;
return newShiftBuffer(bpool->tid, bpool->small_buffers_size);
}

static void destroyLargeBufHandle(struct master_pool_s *pool, master_pool_item_t *item, void *userdata)
{
(void) pool;
buffer_pool_t *bpool = userdata;
destroyShiftBuffer(bpool->tid, item);
}

static void destroySmallBufHandle(struct master_pool_s *pool, master_pool_item_t *item, void *userdata)
{
(void) pool;
buffer_pool_t *bpool = userdata;
destroyShiftBuffer(bpool->tid, item);
}

static void reChargeLargeBuffers(buffer_pool_t *pool)
{
const size_t increase = min((pool->cap - pool->large_buffers_len), pool->cap / 2);

popMasterPoolItems(pool->large_buffers_mp, (void const **) &(pool->large_buffers[pool->large_buffers_len]),
(pool->large_buffers_len + increase), pool);

pool->large_buffers_len += increase;
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
LOGD("BufferPool: allocated %d new buffers, %zu are in use", increase, pool->in_use);
LOGD("BufferPool: allocated %d new large buffers, %zu are in use", increase, pool->in_use);
#endif
}

static void giveMemBackToOs(buffer_pool_t *pool)
static void reChargeSmallBuffers(buffer_pool_t *pool)
{
const size_t decrease = min(pool->len, pool->cap / 2);
const size_t increase = min((pool->cap - pool->small_buffers_len), pool->cap / 2);

for (size_t i = pool->len - decrease; i < pool->len; i++)
{
destroyShiftBuffer(pool->tid, pool->available[i]);
}
pool->len -= decrease;
popMasterPoolItems(pool->small_buffers_mp, (void const **) &(pool->small_buffers[pool->small_buffers_len]),
(pool->small_buffers_len + increase), pool);

pool->small_buffers_len += increase;
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
LOGD("BufferPool: freed %d buffers, %zu are in use", decrease, pool->in_use);
LOGD("BufferPool: allocated %d new small buffers, %zu are in use", increase, pool->in_use);
#endif
#ifdef OS_LINUX
// malloc_trim(0);
}

static void firstCharge(buffer_pool_t *pool)
{
reChargeLargeBuffers(pool);
reChargeSmallBuffers(pool);
}

static void shirinkLargeBuffers(buffer_pool_t *pool)
{
const size_t decrease = min(pool->large_buffers_len, pool->cap / 2);

reuseMasterPoolItems(pool->large_buffers_mp, (void **) &(pool->small_buffers[pool->large_buffers_len - decrease]),
pool->large_buffers_len, pool);

pool->large_buffers_len -= decrease;

#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
LOGD("BufferPool: freed %d large buffers, %zu are in use", decrease, pool->in_use);
#endif
}

static void shirinkSmallBuffers(buffer_pool_t *pool)
{
const size_t decrease = min(pool->small_buffers_len, pool->cap / 2);

reuseMasterPoolItems(pool->small_buffers_mp, (void **) &(pool->small_buffers[pool->small_buffers_len - decrease]),
pool->small_buffers_len, pool);

pool->small_buffers_len -= decrease;

#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
LOGD("BufferPool: freed %d small buffers, %zu are in use", decrease, pool->in_use);
#endif
}

shift_buffer_t *popBuffer(buffer_pool_t *pool)
{
#if defined(DEBUG) && defined(BYPASS_BUFFERPOOL)
return newShiftBuffer(pool->buffers_size);
return newShiftBuffer(pool->large_buffers_size);
#endif
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
pool->in_use += 1;
#endif

if (pool->len > 0)
if (WW_LIKELY(pool->large_buffers_len > 0))
{
--(pool->len);
return pool->available[pool->len];
--(pool->large_buffers_len);
return pool->large_buffers[pool->large_buffers_len];
}
reCharge(pool);
reChargeLargeBuffers(pool);

--(pool->len);
return pool->available[pool->len];
--(pool->large_buffers_len);
return pool->large_buffers[pool->large_buffers_len];
}

shift_buffer_t *popSmallBuffer(buffer_pool_t *pool)
{
#if defined(DEBUG) && defined(BYPASS_BUFFERPOOL)
return newShiftBuffer(pool->small_buffer_size);
#endif
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
pool->in_use += 1;
#endif

if (WW_LIKELY(pool->small_buffers_len > 0))
{
--(pool->small_buffers_len);
return pool->small_buffers[pool->small_buffers_len];
}
reChargeSmallBuffers(pool);

--(pool->small_buffers_len);
return pool->small_buffers[pool->small_buffers_len];
}

void reuseBuffer(buffer_pool_t *pool, shift_buffer_t *b)
Expand All @@ -117,20 +182,32 @@ void reuseBuffer(buffer_pool_t *pool, shift_buffer_t *b)
return;
#endif

if (isShallow(b))
if (WW_UNLIKELY(isShallow(b)))
{
destroyShiftBuffer(pool->tid, b);
return;
}
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
pool->in_use -= 1;
#endif
if (pool->len > pool->free_threshould)
if (isLargeBuffer(pool, b))
{
if (WW_UNLIKELY(pool->large_buffers_len > pool->free_threshould))
{
shirinkLargeBuffers(pool);
}
reset(b, pool->large_buffers_size);
pool->large_buffers[(pool->large_buffers_len)++] = b;
}
else
{
giveMemBackToOs(pool);
if (WW_UNLIKELY(pool->small_buffers_len > pool->free_threshould))
{
shirinkSmallBuffers(pool);
}
reset(b, pool->small_buffers_size);
pool->small_buffers[(pool->small_buffers_len)++] = b;
}
reset(b, pool->buffers_size);
pool->available[(pool->len)++] = b;
}

shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict b1, shift_buffer_t *restrict b2)
Expand All @@ -149,7 +226,8 @@ shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict
return b2;
}

static buffer_pool_t *allocBufferPool(uint8_t tid, unsigned long bufcount, unsigned int buffer_size) // NOLINT
static buffer_pool_t *allocBufferPool(uint8_t tid, unsigned int bufcount, unsigned int large_buffer_size,
unsigned int small_buffer_size)
{
// stop using pool if you want less, simply uncomment lines in popbuffer and reuseBuffer
assert(bufcount >= 1);
Expand All @@ -158,25 +236,33 @@ static buffer_pool_t *allocBufferPool(uint8_t tid, unsigned long bufcount, unsig
bufcount = 2 * bufcount;

const unsigned long container_len = bufcount * sizeof(shift_buffer_t *);
buffer_pool_t *pool = globalMalloc(sizeof(buffer_pool_t) + container_len);

buffer_pool_t *ptr_pool = globalMalloc(sizeof(buffer_pool_t));

*ptr_pool = (buffer_pool_t) {
.cap = bufcount,
.large_buffers_size = large_buffer_size,
.small_buffers_size = small_buffer_size,
.free_threshould = max(bufcount / 2, (bufcount * 2) / 3),
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
.in_use = 0,
#endif
.large_buffers_mp = newMasterPoolWithCap(bufcount * 2, createLargeBufHandle, destroyLargeBufHandle),
.large_buffers = globalMalloc(container_len),
.small_buffers_mp = newMasterPoolWithCap(bufcount * 2, createSmallBufHandle, destroySmallBufHandle),
.small_buffers = globalMalloc(container_len),
.tid = tid};

#ifdef DEBUG
memset(pool, 0xEE, sizeof(buffer_pool_t) + container_len);
memset(ptr_pool->large_buffers, 0xEE, container_len);
memset(ptr_pool->small_buffers, 0xEE, container_len);
#endif
memset(pool, 0, sizeof(buffer_pool_t));
pool->cap = bufcount;
pool->buffers_size = buffer_size;
pool->free_threshould = max(pool->cap / 2, (pool->cap * 2) / 3);
pool->tid = tid;
firstCharge(pool);
return pool;
}

buffer_pool_t *createBufferPool(uint8_t tid)
{
return allocBufferPool(tid, BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE);
firstCharge(ptr_pool);
return ptr_pool;
}

buffer_pool_t *createSmallBufferPool(uint8_t tid)
buffer_pool_t *createBufferPool(uint8_t tid)
{
return allocBufferPool(tid, BUFFERPOOL_SMALL_CONTAINER_LEN, BUFFER_SIZE_SMALL);
return allocBufferPool(tid, BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE, SMALL_BUFSIZE);
}
8 changes: 4 additions & 4 deletions ww/buffer_pool.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#pragma once

#include "master_pool.h"
#include "shiftbuffer.h"
#include <stdatomic.h>

/*
A growable pool, very simple.
preallocates (n) number of buffers at each call to charge(),
users should call popBuffer() when they want a buffer, and later call reuseBuffer when they are done with
the buffer.
Expand All @@ -23,15 +24,14 @@
for performance reasons, this pool dose not inherit from generic_pool, so 80% of the code is the same
but also it has its own differences ofcourse
*/

struct buffer_pool_s;
typedef struct buffer_pool_s buffer_pool_t;

buffer_pool_t *createSmallBufferPool(uint8_t tid);
buffer_pool_t *createBufferPool(uint8_t tid);
shift_buffer_t *popBuffer(buffer_pool_t *pool);
shift_buffer_t *popSmallBuffer(buffer_pool_t *pool);
void reuseBuffer(buffer_pool_t *pool, shift_buffer_t *b);
shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict b1, shift_buffer_t *restrict b2);

Expand Down

0 comments on commit 496706b

Please sign in to comment.