diff --git a/ww/master_pool.c b/ww/master_pool.c index e3105b7..65a742c 100644 --- a/ww/master_pool.c +++ b/ww/master_pool.c @@ -1,18 +1,7 @@ #include "master_pool.h" -static void poolFirstCharge(master_pool_t *pool) -{ - hhybridmutex_lock(&(pool->mutex)); - pool->len = pool->cap / 2; - for (size_t i = 0; i < pool->len; i++) - { - pool->available[i] = pool->create_item_handle(pool); - } - hhybridmutex_unlock(&(pool->mutex)); -} - -master_pool_t *newMasterPoolWithCap(unsigned int pool_width, MasterPoolItemCreateHandle create_h, - MasterPoolItemDestroyHandle destroy_h) +master_pool_t *newMasterPoolWithCap(unsigned int pool_width, MasterPoolItemCreateHandle const create_h, + MasterPoolItemDestroyHandle const destroy_h) { pool_width = (max(1, pool_width) + 15) & ~0x0F; @@ -45,11 +34,14 @@ master_pool_t *newMasterPoolWithCap(unsigned int pool_width, MasterPoolItemCreat memset(pool_ptr, 0xEB, sizeof(master_pool_t) + container_len); #endif - master_pool_t pool = { - .memptr = pool_ptr, .cap = pool_width, .create_item_handle = create_h, .destroy_item_handle = destroy_h}; + master_pool_t pool = {.memptr = pool_ptr, + .cap = pool_width, + .len = 0, + .create_item_handle = create_h, + .destroy_item_handle = destroy_h}; memcpy(pool_ptr, &pool, sizeof(master_pool_t)); hhybridmutex_init(&(pool_ptr->mutex)); - poolFirstCharge(pool_ptr); + return pool_ptr; } diff --git a/ww/master_pool.h b/ww/master_pool.h index 9709510..a3832fa 100644 --- a/ww/master_pool.h +++ b/ww/master_pool.h @@ -4,15 +4,30 @@ #include "utils/mathutils.h" #include "ww.h" +/* + Master Pool + + In some cases, workers need to send data/buffers to each other, while each have a thread local pool + + therefore, thread local pools may keep running out of items, and there is a need for a thread-safe-pool + + thread local pools will fall back to the master pool instead of allocating more memory or freeing it and + interacting with os, malloc,free; in a single batch allocation for a full charge with only 1 mutex lock + + + +*/ + struct master_pool_s; typedef void master_pool_item_t; -typedef master_pool_item_t *(*MasterPoolItemCreateHandle)(struct master_pool_s *pool); -typedef void (*MasterPoolItemDestroyHandle)(struct master_pool_s *pool, master_pool_item_t *item); +typedef master_pool_item_t *(*MasterPoolItemCreateHandle)(struct master_pool_s *pool,void* userdata); +typedef void (*MasterPoolItemDestroyHandle)(struct master_pool_s *pool, master_pool_item_t *item,void* userdata); /* do not read this pool properties from the struct, its a multi-threaded object */ + typedef struct master_pool_s { void *memptr; @@ -20,11 +35,12 @@ typedef struct master_pool_s MasterPoolItemCreateHandle create_item_handle; MasterPoolItemDestroyHandle destroy_item_handle; atomic_uint len; - unsigned int cap; // fixed + const unsigned int cap; void *available[]; } master_pool_t; -static inline void popMasterPoolItems(master_pool_t *pool, master_pool_item_t **iptr, unsigned int count) +static inline void popMasterPoolItems(master_pool_t *const pool, master_pool_item_t const **const iptr, + const unsigned int count,void* userdata) { if (atomic_load_explicit(&(pool->len), memory_order_relaxed) > 0) @@ -36,36 +52,39 @@ static inline void popMasterPoolItems(master_pool_t *pool, master_pool_item_t ** { atomic_fetch_add_explicit(&(pool->len), -consumed, memory_order_relaxed); const unsigned int pbase = (tmp_len - consumed); - unsigned int i = 0; + unsigned int i = 0; for (; i < consumed; i++) { iptr[i] = pool->available[pbase + i]; } for (; i < count; i++) { - iptr[i] = pool->create_item_handle(pool); + iptr[i] = pool->create_item_handle(pool,userdata); } } hhybridmutex_unlock(&(pool->mutex)); return; } + for (unsigned int i = 0; i < count; i++) { - iptr[i] = pool->create_item_handle(pool); + iptr[i] = pool->create_item_handle(pool,userdata); } } -static inline void reuseMasterPoolItems(master_pool_t *pool, master_pool_item_t **iptr, unsigned int count) +static inline void reuseMasterPoolItems(master_pool_t *const pool, master_pool_item_t **const iptr, + const unsigned int count,void* userdata) { if (pool->cap - atomic_load_explicit(&(pool->len), memory_order_relaxed) == 0) { for (unsigned int i = 0; i < count; i++) { - pool->destroy_item_handle(pool, iptr[i]); + pool->destroy_item_handle(pool, iptr[i],userdata); } return; } + hhybridmutex_lock(&(pool->mutex)); const unsigned int tmp_len = atomic_load_explicit(&(pool->len), memory_order_relaxed); @@ -82,9 +101,10 @@ static inline void reuseMasterPoolItems(master_pool_t *pool, master_pool_item_t } for (; i < count; i++) { - pool->destroy_item_handle(pool, iptr[i]); + pool->destroy_item_handle(pool, iptr[i],userdata); } } + hhybridmutex_unlock(&(pool->mutex)); }