Skip to content

Commit

Permalink
create generic pool & use it for context , lines
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Jun 3, 2024
1 parent 0e1cc60 commit 2637821
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 14 deletions.
1 change: 1 addition & 0 deletions ww/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ add_library(ww STATIC
buffer_stream.c
config_file.c
buffer_pool.c
generic_pool.c
http_def.c
cacert.c
sync_dns.c
Expand Down
3 changes: 1 addition & 2 deletions ww/buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#define BUFFER_SIZE (BASE_READ_BUFSIZE + (BASE_READ_BUFSIZE * BUFFER_SIZE_MORE)) // [8k,32k]


#define BYPASS_POOL 0

// NOLINTEND
Expand Down Expand Up @@ -78,7 +77,7 @@ shift_buffer_t *popBuffer(buffer_pool_t *pool)
#if BYPASS_POOL
return newShiftBuffer(BUFFER_SIZE);
#endif

if (pool->len <= 0)
{
reCharge(pool);
Expand Down
2 changes: 2 additions & 0 deletions ww/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
so the pool width is affected by ww memory profile
for performance reasons, this pool dose not inherit from generic_pool and 80% of the code is the same
*/

struct buffer_pool_s
Expand Down
51 changes: 51 additions & 0 deletions ww/generic_pool.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include "generic_pool.h"
#include "utils/mathutils.h"
#include "ww.h"

#define GENERIC_POOL_DEFAULT_WIDTH ((unsigned long) ((16 * ram_profile)))

extern void poolReCharge(generic_pool_t *pool);
extern void poolShrink(generic_pool_t *pool);
extern pool_item_t *popPoolItem(generic_pool_t *pool);
extern void reusePoolItem(generic_pool_t *pool, pool_item_t *b);

static void poolFirstCharge(generic_pool_t *pool)
{
pool->len = pool->cap / 2;
for (size_t i = 0; i < pool->len; i++)
{
pool->available[i] = pool->create_item_handle(pool);
}
}

static generic_pool_t *allocateGenericPool(unsigned long pool_width, PoolItemCreateHandle create_h,
PoolItemDestroyHandle destroy_h)
{

pool_width = (unsigned long) pow(2, floor(log2((double) (max(16, (ssize_t) pool_width)))));
// half of the pool is used, other half is free at startup
pool_width = 2 * pool_width;

const unsigned long container_len = pool_width * sizeof(pool_item_t *);
generic_pool_t *pool = malloc(sizeof(generic_pool_t) + container_len);
#ifdef DEBUG
memset(pool, 0xEE, sizeof(generic_pool_t) + container_len);
#endif
memset(pool, 0, sizeof(generic_pool_t));
pool->cap = pool_width;
pool->free_threshould = (pool->cap * 2) / 3;
pool->create_item_handle = create_h;
pool->destroy_item_handle = destroy_h;
poolFirstCharge(pool);
return pool;
}

generic_pool_t *newGenericPool(PoolItemCreateHandle create_h, PoolItemDestroyHandle destroy_h)
{
return allocateGenericPool(GENERIC_POOL_DEFAULT_WIDTH, create_h, destroy_h);
}
generic_pool_t *newGenericPoolWithSize(unsigned long pool_width, PoolItemCreateHandle create_h,
PoolItemDestroyHandle destroy_h)
{
return allocateGenericPool(pool_width, create_h, destroy_h);
}
132 changes: 132 additions & 0 deletions ww/generic_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#pragma once
#include <stdatomic.h>
#include <stddef.h>
#ifdef OS_UNIX
#include <malloc.h>
#endif

/*
A growable pool, very simple.
preallocates (n) number of buffers at each call to charge(),
recharing is done autmatically and internally.
pool width is affected by ww memory profile
when DEBUG is true, you can:
define POOL_DEBUG to view debug logs about how many itmes are in use or in pool
define BYPASS_POOL to bypass the pool for your debug goals or leak finding
*/

struct generic_pool_s;

typedef void pool_item_t;
typedef pool_item_t *(*PoolItemCreateHandle)(struct generic_pool_s *pool);
typedef void (*PoolItemDestroyHandle)(struct generic_pool_s *pool, pool_item_t *item);

#if defined(DEBUG) && defined(POOL_DEBUG)
#define GENERIC_POOL_FIELDS \
unsigned int len; \
unsigned int cap; \
unsigned int free_threshould; \
atomic_size_t in_use; \
PoolItemCreateHandle create_item_handle; \
PoolItemDestroyHandle destroy_item_handle; \
pool_item_t *available[];
#else

#define GENERIC_POOL_FIELDS \
unsigned int len; \
unsigned int cap; \
unsigned int free_threshould; \
PoolItemCreateHandle create_item_handle; \
PoolItemDestroyHandle destroy_item_handle; \
pool_item_t *available[];

#endif

struct generic_pool_s
{
GENERIC_POOL_FIELDS
};

typedef struct generic_pool_s generic_pool_t;

inline void poolReCharge(generic_pool_t *pool)
{
const size_t increase = ((pool->cap - pool->len) < (pool->cap) / 2 ? (pool->cap - pool->len) : (pool->cap / 2));

for (size_t i = pool->len; i < (pool->len + increase); i++)
{
pool->available[i] = pool->create_item_handle(pool);
}
pool->len += increase;
#if defined(DEBUG) && defined(POOL_DEBUG)
LOGD("BufferPool: allocated %d new buffers, %zu are in use", increase, pool->in_use);
#endif
}

inline void poolShrink(generic_pool_t *pool)
{
const size_t decrease = (pool->len < (pool->cap / 2) ? pool->len : (pool->cap / 2));

for (size_t i = pool->len - decrease; i < pool->len; i++)
{
pool->destroy_item_handle(pool, pool->available[i]);
}
pool->len -= decrease;

#if defined(DEBUG) && defined(POOL_DEBUG)
LOGD("BufferPool: freed %d buffers, %zu are in use", decrease, pool->in_use);
#endif
#ifdef OS_UNIX
malloc_trim(0);
#endif
}

inline pool_item_t *popPoolItem(generic_pool_t *pool)
{
#if defined(DEBUG) && defined(BYPASS_POOL)
return pool->create_item_handle(pool);
#endif

if (pool->len <= 0)
{
poolReCharge(pool);
}

#if defined(DEBUG) && defined(POOL_DEBUG)
pool->in_use += 1;
#endif
--(pool->len);
return pool->available[pool->len];
}

inline void reusePoolItem(generic_pool_t *pool, pool_item_t *b)
{
#if defined(DEBUG) && defined(BYPASS_POOL)
pool->destroy_item_handle(pool, b);
return;
#endif

#if defined(DEBUG) && defined(POOL_DEBUG)
pool->in_use -= 1;
#endif
if (pool->len > pool->free_threshould)
{
poolShrink(pool);
}

pool->available[(pool->len)++] = b;
}

generic_pool_t *newGenericPool(PoolItemCreateHandle create_h, PoolItemDestroyHandle destroy_h);
generic_pool_t *newGenericPoolWithSize(unsigned long pool_width, PoolItemCreateHandle create_h,
PoolItemDestroyHandle destroy_h);

#undef BYPASS_POOL
#undef POOL_DEBUG
4 changes: 2 additions & 2 deletions ww/shiftbuffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ shift_buffer_t *newShiftBuffer(unsigned int pre_cap)
{
if (pre_cap != 0 && pre_cap % 16 != 0)
{
pre_cap = (unsigned int) pow(2, ceil(16 + log2(pre_cap)));
pre_cap = (unsigned int) pow(2, ceil(log2(16 + pre_cap)));
}

unsigned int real_cap = pre_cap * 2;
Expand Down Expand Up @@ -89,7 +89,7 @@ void reset(shift_buffer_t *self, unsigned int pre_cap)
assert(! isShallow(self));
if (pre_cap != 0 && pre_cap % 16 != 0)
{
pre_cap = (unsigned int) pow(2, ceil(16 + log2(pre_cap)));
pre_cap = (unsigned int) pow(2, ceil(log2(16 + pre_cap)));
}
unsigned int real_cap = pre_cap * 2;

Expand Down
23 changes: 23 additions & 0 deletions ww/tunnel.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,29 @@ tunnel_t *newTunnel(void)
return t;
}

pool_item_t *allocLinePoolHandle(struct generic_pool_s *pool)
{
(void) pool;
return malloc(sizeof(line_t));
}
void destroyLinePoolHandle(struct generic_pool_s *pool, pool_item_t *item)
{
(void) pool;
free(item);
}

pool_item_t *allocContextPoolHandle(struct generic_pool_s *pool)
{
(void) pool;
return malloc(sizeof(context_t));
}

void destroyContextPoolHandle(struct generic_pool_s *pool, pool_item_t *item)
{
(void) pool;
free(item);
}

void defaultUpStream(tunnel_t *self, context_t *c)
{
if (self->up != NULL)
Expand Down
19 changes: 13 additions & 6 deletions ww/tunnel.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "basic_types.h"
#include "buffer_pool.h"
#include "generic_pool.h"
#include "hloop.h"
#include "shiftbuffer.h"
#include "ww.h"
Expand Down Expand Up @@ -106,10 +107,12 @@ void chainUp(tunnel_t *from, tunnel_t *to);
void defaultUpStream(tunnel_t *self, context_t *c);
void defaultDownStream(tunnel_t *self, context_t *c);

pool_item_t *allocLinePoolHandle(struct generic_pool_s *pool);
void destroyLinePoolHandle(struct generic_pool_s *pool, pool_item_t *item);

inline line_t *newLine(uint8_t tid)
{
size_t size = sizeof(line_t);
line_t *result = malloc(size);
line_t *result = popPoolItem(line_pools[tid]);

*result = (line_t){
.tid = tid,
Expand Down Expand Up @@ -219,7 +222,7 @@ inline void internalUnRefLine(line_t *l)
free(l->dest_ctx.domain);
}

free(l);
reusePoolItem(line_pools[l->tid], l);
}

inline void lockLine(line_t *line)
Expand All @@ -238,16 +241,20 @@ inline void destroyLine(line_t *l)
unLockLine(l);
}

pool_item_t *allocContextPoolHandle(struct generic_pool_s *pool);
void destroyContextPoolHandle(struct generic_pool_s *pool, pool_item_t *item);

inline void destroyContext(context_t *c)
{
assert(c->payload == NULL);
const uint8_t tid = c->line->tid;
unLockLine(c->line);
free(c);
reusePoolItem(context_pools[tid], c);
}

inline context_t *newContext(line_t *line)
{
context_t *new_ctx = malloc(sizeof(context_t));
context_t *new_ctx = popPoolItem(context_pools[line->tid]);
*new_ctx = (context_t){.line = line};
lockLine(line);
return new_ctx;
Expand All @@ -256,7 +263,7 @@ inline context_t *newContext(line_t *line)
inline context_t *newContextFrom(context_t *source)
{
lockLine(source->line);
context_t *new_ctx = malloc(sizeof(context_t));
context_t *new_ctx = popPoolItem(context_pools[source->line->tid]);
*new_ctx = (context_t){.line = source->line};
return new_ctx;
}
Expand Down
23 changes: 19 additions & 4 deletions ww/ww.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "ww.h"
#include "buffer_pool.h"
#include "generic_pool.h"
#include "hlog.h"
#include "hloop.h"
#include "hplatform.h"
Expand All @@ -25,6 +26,8 @@ unsigned int frand_seed = 0;
unsigned int ram_profile = 0;
struct hloop_s **loops = NULL;
struct buffer_pool_s **buffer_pools = NULL;
struct generic_pool_s **context_pools = NULL;
struct generic_pool_s **line_pools = NULL;
struct socket_manager_s *socekt_manager = NULL;
struct node_manager_s *node_manager = NULL;
logger_t *core_logger = NULL;
Expand All @@ -39,6 +42,8 @@ struct ww_runtime_state_s
unsigned int ram_profile;
struct hloop_s **loops;
struct buffer_pool_s **buffer_pools;
struct generic_pool_s **context_pools;
struct generic_pool_s **line_pools;
struct socket_manager_s *socekt_manager;
struct node_manager_s *node_manager;
logger_t *core_logger;
Expand All @@ -54,6 +59,8 @@ void setWW(struct ww_runtime_state_s *state)
ram_profile = state->ram_profile;
loops = state->loops;
buffer_pools = state->buffer_pools;
context_pools = state->context_pools;
line_pools = state->line_pools;
socekt_manager = state->socekt_manager;
node_manager = state->node_manager;
setCoreLogger(state->core_logger);
Expand All @@ -74,6 +81,8 @@ struct ww_runtime_state_s *getWW(void)
state->ram_profile = ram_profile;
state->loops = loops;
state->buffer_pools = buffer_pools;
state->context_pools = context_pools;
state->line_pools = line_pools;
state->socekt_manager = socekt_manager;
state->node_manager = node_manager;
state->core_logger = core_logger;
Expand Down Expand Up @@ -153,14 +162,20 @@ void createWW(ww_construction_data_t init_data)
fprintf(stderr, "workers count was not in valid range, value: %u range:[1 - 255]\n", workers_count);
}

workers = (hthread_t *) malloc(sizeof(hthread_t) * workers_count);
frand_seed = time(NULL);
ram_profile = init_data.ram_profile;
buffer_pools = (struct buffer_pool_s **) malloc(sizeof(struct buffer_pool_s *) * workers_count);
workers = (hthread_t *) malloc(sizeof(hthread_t) * workers_count);
frand_seed = time(NULL);
ram_profile = init_data.ram_profile;
buffer_pools = (struct buffer_pool_s **) malloc(sizeof(struct buffer_pool_s *) * workers_count);
context_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * workers_count);
line_pools = (struct generic_pool_s **) malloc(sizeof(struct generic_pool_s *) * workers_count);

for (unsigned int i = 0; i < workers_count; ++i)
{
buffer_pools[i] = createBufferPool();
context_pools[i] =
newGenericPoolWithSize((16 * 8) + ram_profile, allocContextPoolHandle, destroyContextPoolHandle);
context_pools[i] =
newGenericPoolWithSize((16 * 4) + ram_profile, allocContextPoolHandle, destroyContextPoolHandle);
}

loops = (hloop_t **) malloc(sizeof(hloop_t *) * workers_count);
Expand Down
Loading

0 comments on commit 2637821

Please sign in to comment.