Skip to content

Commit

Permalink
rework pools
Browse files Browse the repository at this point in the history
  • Loading branch information
radkesvat committed Aug 13, 2024
1 parent 3e0e63f commit 5276627
Show file tree
Hide file tree
Showing 18 changed files with 140 additions and 103 deletions.
11 changes: 8 additions & 3 deletions tunnels/adapters/device/capture/caputre_device.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ static void upStream(tunnel_t *self, context_t *c)
capture_device_state_t *state = TSTATE((tunnel_t *) self);

capture_device_t *cdev = state->cdev;
writeToCaptureDevce(cdev, c->payload);

dropContexPayload(c);
if (! writeToCaptureDevce(cdev, c->payload))
{
reuseContextPayload(c);
}
else
{
dropContexPayload(c);
}
destroyContext(c);
}

Expand Down
13 changes: 9 additions & 4 deletions tunnels/adapters/device/raw/raw_device.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,14 @@ static void upStream(tunnel_t *self, context_t *c)
raw_device_state_t *state = TSTATE((tunnel_t *) self);

raw_device_t *rdev = state->rdev;
writeToRawDevce(rdev, c->payload);

dropContexPayload(c);
if (! writeToRawDevce(rdev, c->payload))
{
reuseContextPayload(c);
}
else
{
dropContexPayload(c);
}
destroyContext(c);
}

Expand Down Expand Up @@ -124,7 +129,7 @@ tunnel_t *newRawDevice(node_instance_context_t *instance_info)
}

// not forced
getStringFromJsonObjectOrDefault(&(state->name), settings, "device-name","unnamed-device");
getStringFromJsonObjectOrDefault(&(state->name), settings, "device-name", "unnamed-device");
uint32_t fwmark = 0;
getIntFromJsonObjectOrDefault((int *) &fwmark, settings, "mark", 0);

Expand Down
14 changes: 9 additions & 5 deletions tunnels/adapters/device/tun/tun_device.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,20 @@ static void printIPPacketInfo(const unsigned char *buffer, unsigned int len)
LOGD(logbuf);
}



static void upStream(tunnel_t *self, context_t *c)
{
tun_device_state_t *state = TSTATE((tunnel_t *) self);

tun_device_t *tdev = state->tdev;
writeToTunDevce(tdev, c->payload);
if (! writeToTunDevce(tdev, c->payload))
{
reuseContextPayload(c);
}
else
{
dropContexPayload(c);
}

dropContexPayload(c);
destroyContext(c);
}

Expand All @@ -99,7 +103,7 @@ static void onIPPacketReceived(struct tun_device_s *tdev, void *userdata, shift_
tun_device_state_t *state = TSTATE((tunnel_t *) self);

#if LOG_PACKET_INFO
printIPPacketInfo(rawBuf(buf),bufLen(buf));
printIPPacketInfo(rawBuf(buf), bufLen(buf));
#endif

// reuseBuffer(getWorkerBufferPool(tid), buf);
Expand Down
2 changes: 1 addition & 1 deletion tunnels/client/http2/http2_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ static void downStream(tunnel_t *self, context_t *c)

if (ret != (ssize_t) consumed)
{
assert(false);
// assert(false);
deleteHttp2Connection(con);
self->up->upStream(self->up, newFinContext(c->line));
reuseContextPayload(c);
Expand Down
2 changes: 1 addition & 1 deletion tunnels/server/http2/http2_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ static void upStream(tunnel_t *self, context_t *c)

if (ret != (ssize_t) consumed)
{
assert(false);
// assert(false);
deleteHttp2Connection(con);
self->dw->downStream(self->dw, newFinContext(c->line));
reuseContextPayload(c);
Expand Down
34 changes: 12 additions & 22 deletions ww/buffer_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,10 @@ struct buffer_pool_s
shift_buffer_t **large_buffers;
master_pool_t *small_buffers_mp;
shift_buffer_t **small_buffers;
uint8_t tid;
};

// NOLINTEND

void reuseBufferThreadSafe(shift_buffer_t *buf)
{
if (isLargeBuffer(buf))
{
reset(buf, getBufferPoolLargeBufferDefaultSize());
reuseMasterPoolItems(GSTATE.masterpool_buffer_pools_large, (void **) &buf, 1);
}
else
{
reset(buf, getBufferPoolSmallBufferDefaultSize());
reuseMasterPoolItems(GSTATE.masterpool_buffer_pools_small, (void **) &buf, 1);
}
}

unsigned int getBufferPoolLargeBufferDefaultSize(void)
{
Expand All @@ -74,6 +60,7 @@ bool isLargeBuffer(shift_buffer_t *buf)
static master_pool_item_t *createLargeBufHandle(struct master_pool_s *pool, void *userdata)
{
(void) pool;

buffer_pool_t *bpool = userdata;
return newShiftBuffer(bpool->shift_buffer_pool, bpool->large_buffers_default_size);
}
Expand Down Expand Up @@ -104,7 +91,7 @@ static void reChargeLargeBuffers(buffer_pool_t *pool)
const size_t increase = min((pool->cap - pool->large_buffers_container_len), pool->cap / 2);

popMasterPoolItems(pool->large_buffers_mp,
(void const **) &(pool->large_buffers[pool->large_buffers_container_len]), increase);
(void const **) &(pool->large_buffers[pool->large_buffers_container_len]), increase, pool);

pool->large_buffers_container_len += increase;
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
Expand All @@ -117,7 +104,7 @@ static void reChargeSmallBuffers(buffer_pool_t *pool)
const size_t increase = min((pool->cap - pool->small_buffers_container_len), pool->cap / 2);

popMasterPoolItems(pool->small_buffers_mp,
(void const **) &(pool->small_buffers[pool->small_buffers_container_len]), increase);
(void const **) &(pool->small_buffers[pool->small_buffers_container_len]), increase, pool);

pool->small_buffers_container_len += increase;
#if defined(DEBUG) && defined(BUFFER_POOL_DEBUG)
Expand All @@ -142,7 +129,8 @@ static void shrinkLargeBuffers(buffer_pool_t *pool)
const size_t decrease = min(pool->large_buffers_container_len, pool->cap / 2);

reuseMasterPoolItems(pool->large_buffers_mp,
(void **) &(pool->large_buffers[pool->large_buffers_container_len - decrease]), decrease);
(void **) &(pool->large_buffers[pool->large_buffers_container_len - decrease]), decrease,
pool);

pool->large_buffers_container_len -= decrease;

Expand All @@ -156,7 +144,8 @@ static void shrinkSmallBuffers(buffer_pool_t *pool)
const size_t decrease = min(pool->small_buffers_container_len, pool->cap / 2);

reuseMasterPoolItems(pool->small_buffers_mp,
(void **) &(pool->small_buffers[pool->small_buffers_container_len - decrease]), decrease);
(void **) &(pool->small_buffers[pool->small_buffers_container_len - decrease]), decrease,
pool);

pool->small_buffers_container_len -= decrease;

Expand Down Expand Up @@ -285,8 +274,8 @@ static buffer_pool_t *allocBufferPool(struct master_pool_s *mp_large, struct mas
.small_buffers = globalMalloc(container_len),
};

installMasterPoolAllocCallbacks(ptr_pool->large_buffers_mp, ptr_pool, createLargeBufHandle, destroyLargeBufHandle);
installMasterPoolAllocCallbacks(ptr_pool->small_buffers_mp, ptr_pool, createSmallBufHandle, destroySmallBufHandle);
installMasterPoolAllocCallbacks(ptr_pool->large_buffers_mp, createLargeBufHandle, destroyLargeBufHandle);
installMasterPoolAllocCallbacks(ptr_pool->small_buffers_mp, createSmallBufHandle, destroySmallBufHandle);

#ifdef DEBUG
memset(ptr_pool->large_buffers, 0xFE, container_len);
Expand All @@ -297,7 +286,8 @@ static buffer_pool_t *allocBufferPool(struct master_pool_s *mp_large, struct mas
return ptr_pool;
}

buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, generic_pool_t *sb_pool)
buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, generic_pool_t *sb_pool,
unsigned int pool_width)
{
return allocBufferPool(mp_large, mp_small, sb_pool, BUFFERPOOL_CONTAINER_LEN, LARGE_BUFFER_SIZE, SMALL_BUFFER_SIZE);
return allocBufferPool(mp_large, mp_small, sb_pool, pool_width, LARGE_BUFFER_SIZE, SMALL_BUFFER_SIZE);
}
12 changes: 6 additions & 6 deletions ww/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@

typedef struct buffer_pool_s buffer_pool_t;

buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small,
generic_pool_t *sb_pool);
buffer_pool_t *createBufferPool(struct master_pool_s *mp_large, struct master_pool_s *mp_small, generic_pool_t *sb_pool,
unsigned int pool_width);
shift_buffer_t *popBuffer(buffer_pool_t *pool);
shift_buffer_t *popSmallBuffer(buffer_pool_t *pool);
shift_buffer_t *appendBufferMerge(buffer_pool_t *pool, shift_buffer_t *restrict b1, shift_buffer_t *restrict b2);
void reuseBuffer(buffer_pool_t *pool, shift_buffer_t *b);
void reuseBufferThreadSafe(shift_buffer_t *buf);
unsigned int getBufferPoolLargeBufferDefaultSize(void);
unsigned int getBufferPoolSmallBufferDefaultSize(void);
bool isLargeBuffer(shift_buffer_t *buf);
// void reuseBufferThreadSafe(shift_buffer_t *buf);
unsigned int getBufferPoolLargeBufferDefaultSize(void);
unsigned int getBufferPoolSmallBufferDefaultSize(void);
bool isLargeBuffer(shift_buffer_t *buf);
11 changes: 7 additions & 4 deletions ww/devices/capture/capture.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ typedef struct capture_device_s
hthread_routine routine_reader;
hthread_routine routine_writer;

master_pool_t *reader_message_pool;
generic_pool_t *reader_shift_buffer_pool;
buffer_pool_t *reader_buffer_pool;
master_pool_t *reader_message_pool;
generic_pool_t *reader_shift_buffer_pool;
buffer_pool_t *reader_buffer_pool;
generic_pool_t *writer_shift_buffer_pool;
buffer_pool_t *writer_buffer_pool;

CaptureReadEventHandle read_event_callback;

struct hchan_s *writer_buffer_channel;
Expand All @@ -39,4 +42,4 @@ bool bringCaptureDeviceDown(capture_device_t *cdev);

capture_device_t *createCaptureDevice(const char *name, uint32_t queue_number, void *userdata,
CaptureReadEventHandle cb);
void writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf);
bool writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf);
35 changes: 21 additions & 14 deletions ww/devices/capture/capture_linux.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ static void localThreadEventReceived(hevent_t *ev)

msg->cdev->read_event_callback(msg->cdev, msg->cdev->userdata, msg->buf, tid);

reuseMasterPoolItems(msg->cdev->reader_message_pool, (void **) &msg, 1);
reuseMasterPoolItems(msg->cdev->reader_message_pool, (void **) &msg, 1, msg->cdev);
}

static void distributePacketPayload(capture_device_t *cdev, tid_t target_tid, shift_buffer_t *buf)
{
struct msg_event *msg;
popMasterPoolItems(cdev->reader_message_pool, (const void **) &(msg), 1);
popMasterPoolItems(cdev->reader_message_pool, (const void **) &(msg), 1, cdev);

*msg = (struct msg_event) {.cdev = cdev, .buf = buf};

Expand Down Expand Up @@ -176,13 +176,13 @@ static bool netfilterSetQueueLength(int netfilter_socket, uint16_t qnumber, uint
static int netfilterGetPacket(int netfilter_socket, uint16_t qnumber, shift_buffer_t *buff)
{
// Read a message from netlink
char nl_buff[512+kEthDataLen + sizeof(struct ethhdr) + sizeof(struct nfqnl_msg_packet_hdr)];
char nl_buff[512 + kEthDataLen + sizeof(struct ethhdr) + sizeof(struct nfqnl_msg_packet_hdr)];
struct sockaddr_nl nl_addr;
socklen_t nl_addr_len = sizeof(nl_addr);
ssize_t result =
recvfrom(netfilter_socket, nl_buff, sizeof(nl_buff), 0, (struct sockaddr *) &nl_addr, &nl_addr_len);
if (result <= (int) sizeof(struct nlmsghdr))

if (result <= (int) sizeof(struct nlmsghdr))
{
errno = EINVAL;
return -1;
Expand Down Expand Up @@ -335,7 +335,7 @@ static HTHREAD_ROUTINE(routineWriteToCapture) // NOLINT

nwrite = sendto(cdev->socket, ip_header, bufLen(buf), 0, (struct sockaddr *) (&to_addr), sizeof(to_addr));

reuseBufferThreadSafe(buf);
reuseBuffer(cdev->reader_buffer_pool, buf);

if (nwrite < 0)
{
Expand All @@ -346,7 +346,7 @@ static HTHREAD_ROUTINE(routineWriteToCapture) // NOLINT
return 0;
}

void writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf)
bool writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf)
{
bool closed = false;
if (! hchanTrySend(cdev->writer_buffer_channel, &buf, &closed))
Expand All @@ -359,8 +359,9 @@ void writeToCaptureDevce(capture_device_t *cdev, shift_buffer_t *buf)
{
LOGE("CaptureDevice:write failed, ring is full");
}
reuseBufferThreadSafe(buf);
return false;
}
return true;
}

bool bringCaptureDeviceUP(capture_device_t *cdev)
Expand Down Expand Up @@ -394,7 +395,7 @@ bool bringCaptureDeviceDown(capture_device_t *cdev)
shift_buffer_t *buf;
while (hchanRecv(cdev->writer_buffer_channel, &buf))
{
reuseBufferThreadSafe(buf);
reuseBuffer(cdev->reader_buffer_pool, buf);
}

return true;
Expand Down Expand Up @@ -446,8 +447,13 @@ capture_device_t *createCaptureDevice(const char *name, uint32_t queue_number, v

generic_pool_t *sb_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, (64) + GSTATE.ram_profile,
allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle);
buffer_pool_t *bpool =
createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, sb_pool);
buffer_pool_t *bpool = createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small,
sb_pool, GSTATE.ram_profile);

generic_pool_t *writer_sb_pool = newGenericPoolWithCap(GSTATE.masterpool_shift_buffer_pools, 1,
allocShiftBufferPoolHandle, destroyShiftBufferPoolHandle);
buffer_pool_t *writer_bpool =
createBufferPool(GSTATE.masterpool_buffer_pools_large, GSTATE.masterpool_buffer_pools_small, sb_pool, 1);

capture_device_t *cdev = globalMalloc(sizeof(capture_device_t));

Expand All @@ -463,10 +469,11 @@ capture_device_t *createCaptureDevice(const char *name, uint32_t queue_number, v
.userdata = userdata,
.writer_buffer_channel = hchanOpen(sizeof(void *), kCaptureWriteChannelQueueMax),
.reader_message_pool = newMasterPoolWithCap(kMasterMessagePoolCap),
.reader_buffer_pool = bpool};
.reader_buffer_pool = bpool,
.writer_shift_buffer_pool = writer_sb_pool,
.writer_buffer_pool = writer_bpool};

installMasterPoolAllocCallbacks(cdev->reader_message_pool, cdev, allocCaptureMsgPoolHandle,
destroyCaptureMsgPoolHandle);
installMasterPoolAllocCallbacks(cdev->reader_message_pool, allocCaptureMsgPoolHandle, destroyCaptureMsgPoolHandle);

return cdev;
}
11 changes: 7 additions & 4 deletions ww/devices/raw/raw.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ typedef struct raw_device_s
hthread_routine routine_reader;
hthread_routine routine_writer;

master_pool_t *reader_message_pool;
generic_pool_t *reader_shift_buffer_pool;
buffer_pool_t *reader_buffer_pool;
master_pool_t *reader_message_pool;
generic_pool_t *reader_shift_buffer_pool;
buffer_pool_t *reader_buffer_pool;
generic_pool_t *writer_shift_buffer_pool;
buffer_pool_t *writer_buffer_pool;

RawReadEventHandle read_event_callback;

struct hchan_s *writer_buffer_channel;
Expand All @@ -38,4 +41,4 @@ bool bringRawDeviceDown(raw_device_t *rdev);

raw_device_t *createRawDevice(const char *name, uint32_t mark, void *userdata, RawReadEventHandle cb);

void writeToRawDevce(raw_device_t *rdev, shift_buffer_t *buf);
bool writeToRawDevce(raw_device_t *rdev, shift_buffer_t *buf);
Loading

0 comments on commit 5276627

Please sign in to comment.