Skip to content

Commit

Permalink
Reduce lock hold time in the ringbuffer implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Jowett <[email protected]>
  • Loading branch information
Alan Jowett committed Nov 23, 2024
1 parent 5f5c451 commit d1c0713
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 169 deletions.
2 changes: 1 addition & 1 deletion external/usersim
21 changes: 14 additions & 7 deletions libs/api/ebpf_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4266,15 +4266,22 @@ _ebpf_ring_buffer_map_async_query_completion(_Inout_ void* completion_context) N
break;
}

int callback_result = subscription->sample_callback(
subscription->sample_callback_context,
const_cast<void*>(reinterpret_cast<const void*>(record->data)),
record->header.length - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));
if (callback_result != 0) {
break;
while (ebpf_ring_buffer_record_is_locked(record)) {
// The record is being written to by the producer.
// Wait for the producer to finish writing.
YieldProcessor();
}

consumer += record->header.length;
if (!ebpf_ring_buffer_record_is_discarded(record)) {
int callback_result = subscription->sample_callback(
subscription->sample_callback_context,
const_cast<void*>(reinterpret_cast<const void*>(record->data)),
ebpf_ring_buffer_record_size(record) - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));
if (callback_result != 0) {
break;
}
}
consumer += ebpf_ring_buffer_record_size(record);
}
}

Expand Down
263 changes: 115 additions & 148 deletions libs/runtime/ebpf_ring_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,80 +8,16 @@

typedef struct _ebpf_ring_buffer
{
ebpf_lock_t lock;
size_t length;
size_t consumer_offset;
size_t producer_offset;
int64_t length;
int64_t mask;
cxplat_spin_lock_t producer_lock;
cxplat_spin_lock_t consumer_lock;
volatile int64_t consumer_offset;
volatile int64_t producer_offset;
uint8_t* shared_buffer;
ebpf_ring_descriptor_t* ring_descriptor;
} ebpf_ring_buffer_t;

inline static size_t
_ring_get_length(_In_ const ebpf_ring_buffer_t* ring)
{
return ring->length;
}

inline static size_t
_ring_get_producer_offset(_In_ const ebpf_ring_buffer_t* ring)
{
return ring->producer_offset % ring->length;
}

inline static size_t
_ring_get_consumer_offset(_In_ const ebpf_ring_buffer_t* ring)
{
return ring->consumer_offset % ring->length;
}

inline static size_t
_ring_get_used_capacity(_In_ const ebpf_ring_buffer_t* ring)
{
ebpf_assert(ring->producer_offset >= ring->consumer_offset);
return ring->producer_offset - ring->consumer_offset;
}

inline static void
_ring_advance_producer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length)
{
ring->producer_offset += length;
}

inline static void
_ring_advance_consumer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length)
{
ring->consumer_offset += length;
}

inline static _Ret_notnull_ ebpf_ring_buffer_record_t*
_ring_record_at_offset(_In_ const ebpf_ring_buffer_t* ring, size_t offset)
{
return (ebpf_ring_buffer_record_t*)&ring->shared_buffer[offset % ring->length];
}

inline static _Ret_notnull_ ebpf_ring_buffer_record_t*
_ring_next_consumer_record(_In_ const ebpf_ring_buffer_t* ring)
{
return _ring_record_at_offset(ring, _ring_get_consumer_offset(ring));
}

inline static _Ret_maybenull_ ebpf_ring_buffer_record_t*
_ring_buffer_acquire_record(_Inout_ ebpf_ring_buffer_t* ring, size_t requested_length)
{
ebpf_ring_buffer_record_t* record = NULL;
requested_length += EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data);
size_t remaining_space = ring->length - (ring->producer_offset - ring->consumer_offset);

if (remaining_space > requested_length) {
record = _ring_record_at_offset(ring, _ring_get_producer_offset(ring));
_ring_advance_producer_offset(ring, requested_length);
record->header.length = (uint32_t)requested_length;
record->header.locked = 1;
record->header.discarded = 0;
}
return record;
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_create(_Outptr_ ebpf_ring_buffer_t** ring, size_t capacity)
{
Expand All @@ -100,6 +36,7 @@ ebpf_ring_buffer_create(_Outptr_ ebpf_ring_buffer_t** ring, size_t capacity)
}

local_ring_buffer->length = capacity;
local_ring_buffer->mask = capacity - 1;

local_ring_buffer->ring_descriptor = ebpf_allocate_ring_buffer_memory(capacity);
if (!local_ring_buffer->ring_descriptor) {
Expand Down Expand Up @@ -135,78 +72,81 @@ _Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_output(_Inout_ ebpf_ring_buffer_t* ring, _In_reads_bytes_(length) uint8_t* data, size_t length)
{
ebpf_result_t result;
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length);
uint8_t* buffer;

if (record == NULL) {
result = EBPF_OUT_OF_SPACE;
goto Done;
result = ebpf_ring_buffer_reserve(ring, &buffer, length);
if (result != EBPF_SUCCESS) {
return result;
}

record->header.discarded = 0;
record->header.locked = 0;
memcpy(record->data, data, length);
result = EBPF_SUCCESS;
Done:
ebpf_lock_unlock(&ring->lock, state);
return result;
memcpy(buffer, data, length);

return ebpf_ring_buffer_submit(buffer);
}

void
ebpf_ring_buffer_query(_In_ ebpf_ring_buffer_t* ring, _Out_ size_t* consumer, _Out_ size_t* producer)
{
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
*consumer = ring->consumer_offset;
*producer = ring->producer_offset;
ebpf_lock_unlock(&ring->lock, state);
*consumer = (size_t)ReadAcquire64(&ring->consumer_offset);
*producer = (size_t)ReadAcquire64(&ring->producer_offset);
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_return(_Inout_ ebpf_ring_buffer_t* ring, size_t length)
{
EBPF_LOG_ENTRY();
ebpf_result_t result;
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
size_t local_length = length;
size_t offset = _ring_get_consumer_offset(ring);

if ((length > _ring_get_length(ring)) || length > _ring_get_used_capacity(ring)) {
EBPF_LOG_MESSAGE_UINT64_UINT64(
EBPF_TRACELOG_LEVEL_ERROR,
EBPF_TRACELOG_KEYWORD_MAP,
"ebpf_ring_buffer_return: Buffer too large",
ring->producer_offset,
ring->consumer_offset);
result = EBPF_INVALID_ARGUMENT;
goto Done;
KIRQL old_irql = KeGetCurrentIrql();
if (old_irql < DISPATCH_LEVEL) {
KeRaiseIrqlToDpcLevel();
}

// Verify count.
while (local_length != 0) {
ebpf_ring_buffer_record_t* record = _ring_record_at_offset(ring, offset);
if (local_length < record->header.length) {
break;
}
offset += record->header.length;
local_length -= record->header.length;
cxplat_acquire_spin_lock_at_dpc_level(&ring->consumer_lock);

int64_t consumer_offset = ReadNoFence64(&ring->consumer_offset);
int64_t producer_offset = ReadNoFence64(&ring->producer_offset);
int64_t effective_length = EBPF_PAD_8(length + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

if (consumer_offset == producer_offset) {
result = EBPF_INVALID_ARGUMENT;
goto Exit;
}
// Did it end on a record boundary?
if (local_length != 0) {
EBPF_LOG_MESSAGE_UINT64(
EBPF_TRACELOG_LEVEL_ERROR,
EBPF_TRACELOG_KEYWORD_MAP,
"ebpf_ring_buffer_return: Invalid buffer length",
local_length);

int64_t remaining_space = producer_offset - consumer_offset;

if (remaining_space > effective_length) {
result = EBPF_INVALID_ARGUMENT;
goto Done;
goto Exit;
}

_ring_advance_consumer_offset(ring, length);
remaining_space = effective_length;

while (remaining_space > 0) {
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(ring->shared_buffer + (consumer_offset & ring->mask));

long size = ReadNoFence(&record->size);
size += EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data);
size = EBPF_PAD_8(size);

consumer_offset += size;
remaining_space -= size;

record->size = 0;
}

WriteNoFence64(&ring->consumer_offset, consumer_offset);

result = EBPF_SUCCESS;

Done:
ebpf_lock_unlock(&ring->lock, state);
EBPF_RETURN_RESULT(result);
Exit:
cxplat_release_spin_lock_from_dpc_level(&ring->consumer_lock);

if (old_irql < DISPATCH_LEVEL) {
KeLowerIrql(old_irql);
}

EBPF_RETURN_RESULT(EBPF_SUCCESS);
}

_Must_inspect_result_ ebpf_result_t
Expand All @@ -225,55 +165,82 @@ ebpf_ring_buffer_reserve(
_Inout_ ebpf_ring_buffer_t* ring, _Outptr_result_bytebuffer_(length) uint8_t** data, size_t length)
{
ebpf_result_t result;
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length);
if (record == NULL) {
result = EBPF_INVALID_ARGUMENT;
goto Done;
KIRQL old_irql;
int64_t producer_offset = ReadNoFence64(&ring->producer_offset);
int64_t consumer_offset = ReadNoFence64(&ring->consumer_offset);
int64_t remaining_space = ring->length - (producer_offset - consumer_offset);
size_t effective_length = EBPF_PAD_8(length + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

if (remaining_space < (int64_t)effective_length) {
return EBPF_NO_MEMORY;
}

old_irql = KeGetCurrentIrql();
if (old_irql < DISPATCH_LEVEL) {
KeRaiseIrqlToDpcLevel();
}

cxplat_acquire_spin_lock_at_dpc_level(&ring->producer_lock);

producer_offset = ReadNoFence64(&ring->producer_offset);
consumer_offset = ReadNoFence64(&ring->consumer_offset);

remaining_space = ring->length - (producer_offset - consumer_offset);

if (remaining_space < (int64_t)effective_length) {
result = EBPF_NO_MEMORY;
goto Exit;
}

record->header.locked = 1;
MemoryBarrier();
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(ring->shared_buffer + (producer_offset & ring->mask));

WriteNoFence(&record->size, (long)length | EBPF_RING_BUFFER_RECORD_FLAG_LOCKED);
*data = record->data;

WriteNoFence64(&ring->producer_offset, producer_offset + effective_length);

result = EBPF_SUCCESS;
Done:
ebpf_lock_unlock(&ring->lock, state);

Exit:
cxplat_release_spin_lock_from_dpc_level(&ring->producer_lock);

if (old_irql < DISPATCH_LEVEL) {
KeLowerIrql(old_irql);
}

return result;
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_submit(_Frees_ptr_opt_ uint8_t* data)
{
if (!data) {
ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data);
long size = ReadAcquire(&record->size);

if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) {
return EBPF_INVALID_ARGUMENT;
}
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

record->header.discarded = 0;
// Place a memory barrier here so that all prior writes to the record are completed before the record
// is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and
// the data in the record.
MemoryBarrier();
record->header.locked = 0;

size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED;

WriteRelease(&record->size, size);
return EBPF_SUCCESS;
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_discard(_Frees_ptr_opt_ uint8_t* data)
{
if (!data) {
ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data);
long size = ReadAcquire(&record->size);

if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) {
return EBPF_INVALID_ARGUMENT;
}
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

record->header.discarded = 1;
// Place a memory barrier here so that all prior writes to the record are completed before the record
// is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and
// the data in the record.
MemoryBarrier();
record->header.locked = 0;

size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED;
size |= EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED;

WriteRelease(&record->size, size);
return EBPF_SUCCESS;
}
1 change: 1 addition & 0 deletions libs/runtime/ebpf_ring_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ebpf_ring_buffer_destroy(_Frees_ptr_opt_ ebpf_ring_buffer_t* ring_buffer);
* @retval EBPF_SUCCESS Successfully wrote record ring buffer.
* @retval EBPF_OUT_OF_SPACE Unable to output to ring buffer due to inadequate space.
*/
EBPF_INLINE_HINT
_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_output(_Inout_ ebpf_ring_buffer_t* ring_buffer, _In_reads_bytes_(length) uint8_t* data, size_t length);

Expand Down
Loading

0 comments on commit d1c0713

Please sign in to comment.