Skip to content

Commit

Permalink
Finish integrating master pool and tested a little bit
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Aug 3, 2024
1 parent 2e63a7d commit d6fa120
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 53 deletions.
32 changes: 16 additions & 16 deletions ww/buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ static void reChargeLargeBuffers(buffer_pool_t *pool)
const size_t increase = min((pool->cap - pool->large_buffers_len), pool->cap / 2);

popMasterPoolItems(pool->large_buffers_mp, (void const **) &(pool->large_buffers[pool->large_buffers_len]),
increase, pool);
increase, pool);

pool->large_buffers_len += increase;
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
Expand All @@ -93,7 +93,7 @@ static void reChargeSmallBuffers(buffer_pool_t *pool)
const size_t increase = min((pool->cap - pool->small_buffers_len), pool->cap / 2);

popMasterPoolItems(pool->small_buffers_mp, (void const **) &(pool->small_buffers[pool->small_buffers_len]),
increase, pool);
increase, pool);

pool->small_buffers_len += increase;
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
Expand Down Expand Up @@ -226,7 +226,8 @@ shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict
return b2;
}

static buffer_pool_t *allocBufferPool(struct master_pool_s* mp_large,struct master_pool_s* mp_small,uint8_t tid, unsigned int bufcount, unsigned int large_buffer_size,
static buffer_pool_t *allocBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, uint8_t tid,
unsigned int bufcount, unsigned int large_buffer_size,
unsigned int small_buffer_size)
{
// stop using pool if you want less, simply uncomment lines in popbuffer and reuseBuffer
Expand All @@ -239,19 +240,18 @@ static buffer_pool_t *allocBufferPool(struct master_pool_s* mp_large,struct mast

buffer_pool_t *ptr_pool = globalMalloc(sizeof(buffer_pool_t));

*ptr_pool = (buffer_pool_t) {
.cap = bufcount,
.large_buffers_size = large_buffer_size,
.small_buffers_size = small_buffer_size,
.free_threshold = max(bufcount / 2, (bufcount * 2) / 3),
*ptr_pool = (buffer_pool_t) {.cap = bufcount,
.large_buffers_size = large_buffer_size,
.small_buffers_size = small_buffer_size,
.free_threshold = max(bufcount / 2, (bufcount * 2) / 3),
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
.in_use = 0,
.in_use = 0,
#endif
.large_buffers_mp = mp_large,
.large_buffers = globalMalloc(container_len),
.small_buffers_mp = mp_small,
.small_buffers = globalMalloc(container_len),
.tid = tid};
.large_buffers_mp = mp_large,
.large_buffers = globalMalloc(container_len),
.small_buffers_mp = mp_small,
.small_buffers = globalMalloc(container_len),
.tid = tid};

installMasterPoolAllocCallbacks(ptr_pool->large_buffers_mp, createLargeBufHandle, destroyLargeBufHandle);
installMasterPoolAllocCallbacks(ptr_pool->small_buffers_mp, createSmallBufHandle, destroySmallBufHandle);
Expand All @@ -265,7 +265,7 @@ static buffer_pool_t *allocBufferPool(struct master_pool_s* mp_large,struct mast
return ptr_pool;
}

buffer_pool_t *createBufferPool(struct master_pool_s* mp_large,struct master_pool_s* mp_small,uint8_t tid)
buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, uint8_t tid)
{
return allocBufferPool(mp_large,mp_small,tid, BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE, SMALL_BUFSIZE);
return allocBufferPool(mp_large, mp_small, tid, BUFFERPOOL_CONTAINER_LEN, BUFFER_SIZE, SMALL_BUFSIZE);
}
14 changes: 10 additions & 4 deletions ww/generic_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@ static master_pool_item_t *poolCreateItemHandle(struct master_pool_s *pool, void
{
(void) pool;
generic_pool_t *gpool = userdata;
ifgp

return gpool->create_item_handle(gpool);
if (gpool->item_size == 0)
{
return gpool->create_item_handle(gpool);
}
return globalMalloc(gpool->item_size);
}

static void poolDestroyItemHandle(struct master_pool_s *pool, master_pool_item_t *item, void *userdata)
{
(void) pool;
generic_pool_t *gpool = userdata;
gpool->destroy_item_handle(gpool, item);
if (gpool->item_size == 0)
{
gpool->destroy_item_handle(gpool, item);
}
globalFree(item);
}

void poolReCharge(generic_pool_t *pool)
Expand Down
2 changes: 1 addition & 1 deletion ww/managers/memory_manager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#define ALLOCATOR_BYPASS // switch to stdlib allocators
// #define ALLOCATOR_BYPASS // switch to stdlib allocators

#include <stdbool.h>
#include <stddef.h>
Expand Down
34 changes: 18 additions & 16 deletions ww/managers/socket_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ void registerSocketAcceptor(tunnel_t *tunnel, socket_filter_option_t option, onA
option.shared_balance_table = b_table;
}

*filter = (socket_filter_t){.tunnel = tunnel, .option = option, .cb = cb, .listen_io = NULL};
*filter = (socket_filter_t) {.tunnel = tunnel, .option = option, .cb = cb, .listen_io = NULL};

hhybridmutex_lock(&(state->mutex));
filters_t_push(&(state->filters[pirority]), filter);
Expand Down Expand Up @@ -467,7 +467,7 @@ static void distributeSocket(void *io, socket_filter_t *filter, uint16_t local_p
result->real_localport = local_port;

hloop_t *worker_loop = getWorkerLoop(tid);
hevent_t ev = (hevent_t){.loop = worker_loop, .cb = filter->cb};
hevent_t ev = (hevent_t) {.loop = worker_loop, .cb = filter->cb};
result->tid = tid;
result->io = io;
result->tunnel = filter->tunnel;
Expand Down Expand Up @@ -826,7 +826,7 @@ static void postPayload(udp_payload_t post_pl, socket_filter_t *filter)

pl->tunnel = filter->tunnel;
hloop_t *worker_loop = getWorkerLoop(pl->tid);
hevent_t ev = (hevent_t){.loop = worker_loop, .cb = filter->cb};
hevent_t ev = (hevent_t) {.loop = worker_loop, .cb = filter->cb};
ev.userdata = (void *) pl;

hloop_post_event(worker_loop, &ev);
Expand Down Expand Up @@ -922,13 +922,13 @@ static void onRecvFrom(hio_t *io, shift_buffer_t *buf)
{
udpsock_t *socket = hevent_userdata(io);
uint16_t local_port = sockaddr_port((sockaddr_u *) hio_localaddr_u(io));
uint8_t target_tid = local_port % getWorkersCount();
uint8_t target_tid = local_port % getWorkersCount();

udp_payload_t item = (udp_payload_t){.sock = socket,
.buf = buf,
.tid = target_tid,
.peer_addr = *(sockaddr_u *) hio_peeraddr_u(io),
.real_localport = local_port};
udp_payload_t item = (udp_payload_t) {.sock = socket,
.buf = buf,
.tid = target_tid,
.peer_addr = *(sockaddr_u *) hio_peeraddr_u(io),
.real_localport = local_port};

distributeUdpPayload(item);
}
Expand All @@ -950,7 +950,7 @@ static void listenUdpSinglePort(hloop_t *loop, socket_filter_t *filter, char *ho
exit(1);
}
udpsock_t *socket = globalMalloc(sizeof(udpsock_t));
*socket = (udpsock_t){.io = filter->listen_io, .table = newIdleTable(loop)};
*socket = (udpsock_t) {.io = filter->listen_io, .table = newIdleTable(loop)};
hevent_set_userdata(filter->listen_io, socket);
hio_setcb_read(filter->listen_io, onRecvFrom);
hio_read(filter->listen_io);
Expand Down Expand Up @@ -1014,9 +1014,9 @@ void postUdpWrite(udpsock_t *socket_io, uint8_t tid_from, shift_buffer_t *buf)

udp_payload_t *item = newUpdPayload(tid_from);

*item = (udp_payload_t){.sock = socket_io, .buf = buf, .tid = tid_from};
*item = (udp_payload_t) {.sock = socket_io, .buf = buf, .tid = tid_from};

hevent_t ev = (hevent_t){.loop = hevent_loop(socket_io->io), .userdata = item, .cb = writeUdpThisLoop};
hevent_t ev = (hevent_t) {.loop = hevent_loop(socket_io->io), .userdata = item, .cb = writeUdpThisLoop};

hloop_post_event(hevent_loop(socket_io->io), &ev);
}
Expand Down Expand Up @@ -1085,15 +1085,17 @@ socket_manager_state_t *createSocketManager(worker_t *worker)

state->tcp_pools = globalMalloc(sizeof(*state->tcp_pools) * getWorkersCount());
memset(state->tcp_pools, 0, sizeof(*state->tcp_pools) * getWorkersCount());

master_pool_t *mp_udp = newMasterPoolWithCap(2 * ((8) + RAM_PROFILE));
master_pool_t *mp_tcp = newMasterPoolWithCap(2 * ((8) + RAM_PROFILE));
for (unsigned int i = 0; i < getWorkersCount(); ++i)
{

state->udp_pools[i].pool =
newGenericPoolWithCap((8) + RAM_PROFILE, allocUdpPayloadPoolHandle, destroyUdpPayloadPoolHandle);
newGenericPoolWithCap(mp_udp, (8) + RAM_PROFILE, allocUdpPayloadPoolHandle, destroyUdpPayloadPoolHandle);
hhybridmutex_init(&(state->udp_pools[i].mutex));

state->tcp_pools[i].pool =
newGenericPoolWithCap((8) + RAM_PROFILE, allocTcpResultObjectPoolHandle, destroyTcpResultObjectPoolHandle);
state->tcp_pools[i].pool = newGenericPoolWithCap(mp_tcp, (8) + RAM_PROFILE, allocTcpResultObjectPoolHandle,
destroyTcpResultObjectPoolHandle);
hhybridmutex_init(&(state->tcp_pools[i].mutex));
}

Expand Down
2 changes: 1 addition & 1 deletion ww/master_pool.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "master_pool.h"
#include "ww.h"

static void defaultCreateHandle(struct master_pool_s *pool, void *userdata)
static master_pool_item_t* defaultCreateHandle(struct master_pool_s *pool, void *userdata)
{
(void) pool;
(void) userdata;
Expand Down
44 changes: 29 additions & 15 deletions ww/ww.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ static void initalizeSocketManagerWorker(worker_t *worker, tid_t tid)
{
*worker = (worker_t) {.tid = tid};

worker->shift_buffer_pool =
newGenericPoolWithCap((64) + GSTATE.ram_profile, allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle);
worker->shift_buffer_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, (64) + GSTATE.ram_profile,
allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle);
GSTATE.shortcut_shift_buffer_pools[tid] = getWorker(tid)->shift_buffer_pool;

worker->buffer_pool = createBufferPool(worker->tid);
worker->buffer_pool =
createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, worker->tid);
GSTATE.shortcut_buffer_pools[tid] = getWorker(tid)->buffer_pool;

worker->loop = hloop_new(HLOOP_FLAG_AUTO_FREE, worker->buffer_pool, 0);
Expand All @@ -65,25 +66,27 @@ static void initalizeWorker(worker_t *worker, tid_t tid)
{
*worker = (worker_t) {.tid = tid};

worker->shift_buffer_pool =
newGenericPoolWithCap((64) + GSTATE.ram_profile, allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle);
worker->shift_buffer_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, (64) + GSTATE.ram_profile,
allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle);
GSTATE.shortcut_shift_buffer_pools[tid] = getWorker(tid)->shift_buffer_pool;

worker->buffer_pool = createBufferPool(worker->tid);
worker->buffer_pool =
createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, worker->tid);
GSTATE.shortcut_buffer_pools[tid] = getWorker(tid)->buffer_pool;

worker->loop = hloop_new(HLOOP_FLAG_AUTO_FREE, worker->buffer_pool, 0);
GSTATE.shortcut_loops[tid] = getWorker(tid)->loop;

worker->context_pool =
newGenericPoolWithCap((16) + GSTATE.ram_profile, allocContextPoolHandle, destroyContextPoolHandle);
worker->context_pool = newGenericPoolWithCap(GSTATE.masterpool_context_pools, (16) + GSTATE.ram_profile,
allocContextPoolHandle, destroyContextPoolHandle);
GSTATE.shortcut_context_pools[tid] = getWorker(tid)->context_pool;

worker->line_pool = newGenericPoolWithCap((8) + GSTATE.ram_profile, allocLinePoolHandle, destroyLinePoolHandle);
worker->line_pool = newGenericPoolWithCap(GSTATE.masterpool_line_pools, (8) + GSTATE.ram_profile,
allocLinePoolHandle, destroyLinePoolHandle);
GSTATE.shortcut_line_pools[tid] = getWorker(tid)->line_pool;

worker->pipeline_msg_pool =
newGenericPoolWithCap((8) + GSTATE.ram_profile, allocPipeLineMsgPoolHandle, destroyPipeLineMsgPoolHandle);
worker->pipeline_msg_pool = newGenericPoolWithCap(GSTATE.masterpool_pipeline_msg_pools, (8) + GSTATE.ram_profile,
allocPipeLineMsgPoolHandle, destroyPipeLineMsgPoolHandle);
GSTATE.shortcut_pipeline_msg_pools[tid] = getWorker(tid)->pipeline_msg_pool;
}

Expand Down Expand Up @@ -128,10 +131,8 @@ static void initializeShortCuts(void)
{
assert(GSTATE.initialized);

tid_t workers_count = GSTATE.workers_count;

static const int kShourtcutsCount = 6;
const int total_workers = workers_count + kAdditionalReservedWorkers;
const int total_workers = WORKERS_COUNT + kAdditionalReservedWorkers;

void **space = globalMalloc(sizeof(void *) * kShourtcutsCount * total_workers);

Expand All @@ -143,6 +144,18 @@ static void initializeShortCuts(void)
GSTATE.shortcut_pipeline_msg_pools = (generic_pool_t **) (space + (5UL * total_workers));
}

static void initializeMasterPools(void)
{
assert(GSTATE.initialized);

GSTATE.masterpool_shift_buffer_pools = newMasterPoolWithCap(2 * ((64) + GSTATE.ram_profile));
GSTATE.masterpool_buffer_pools_large = newMasterPoolWithCap(2 * ((0) + GSTATE.ram_profile));
GSTATE.masterpool_buffer_pools_small = newMasterPoolWithCap(2 * ((0) + GSTATE.ram_profile));
GSTATE.masterpool_context_pools = newMasterPoolWithCap(2 * ((16) + GSTATE.ram_profile));
GSTATE.masterpool_line_pools = newMasterPoolWithCap(2 * ((8) + GSTATE.ram_profile));
GSTATE.masterpool_pipeline_msg_pools = newMasterPoolWithCap(2 * ((8) + GSTATE.ram_profile));
}

void createWW(const ww_construction_data_t init_data)
{
GSTATE.initialized = true;
Expand Down Expand Up @@ -191,9 +204,10 @@ void createWW(const ww_construction_data_t init_data)
WORKERS_COUNT = (255 - kAdditionalReservedWorkers);
}

WORKERS = (worker_t *) malloc(sizeof(worker_t) * (WORKERS_COUNT + kAdditionalReservedWorkers));
WORKERS = (worker_t *) globalMalloc(sizeof(worker_t) * (WORKERS_COUNT + kAdditionalReservedWorkers));

initializeShortCuts();
initializeMasterPools();

for (unsigned int i = 0; i < WORKERS_COUNT; ++i)
{
Expand Down
6 changes: 6 additions & 0 deletions ww/ww.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ typedef struct ww_global_state_s
struct generic_pool_s **shortcut_context_pools;
struct generic_pool_s **shortcut_line_pools;
struct generic_pool_s **shortcut_pipeline_msg_pools;
struct master_pool_s *masterpool_buffer_pools_large;
struct master_pool_s *masterpool_buffer_pools_small;
struct master_pool_s *masterpool_shift_buffer_pools;
struct master_pool_s *masterpool_context_pools;
struct master_pool_s *masterpool_line_pools;
struct master_pool_s *masterpool_pipeline_msg_pools;
struct worker_s *workers;
struct socket_manager_s *socekt_manager;
struct node_manager_s *node_manager;
Expand Down

0 comments on commit d6fa120

Please sign in to comment.