Skip to content

Commit

Permalink
Reduce lock hold time in the ringbuffer implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Jowett <[email protected]>
  • Loading branch information
Alan Jowett committed Nov 23, 2024
1 parent 5f5c451 commit a5fe86d
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 171 deletions.
2 changes: 1 addition & 1 deletion external/ebpf-verifier
Submodule ebpf-verifier updated 72 files
+1 −1 external/bpf_conformance
+1 −1 external/libbtf
+173 −123 src/asm_cfg.cpp
+37 −37 src/asm_files.cpp
+0 −5 src/asm_files.hpp
+2 −1 src/asm_marshal.cpp
+104 −112 src/asm_ostream.cpp
+58 −0 src/asm_ostream.hpp
+2 −6 src/asm_parse.cpp
+77 −29 src/asm_syntax.hpp
+5 −5 src/asm_unmarshal.cpp
+7 −15 src/assertions.cpp
+15 −21 src/config.hpp
+9 −3 src/crab/array_domain.cpp
+2 −0 src/crab/array_domain.hpp
+342 −211 src/crab/cfg.hpp
+0 −440 src/crab/ebpf_checker.cpp
+2,720 −99 src/crab/ebpf_domain.cpp
+139 −35 src/crab/ebpf_domain.hpp
+0 −2,442 src/crab/ebpf_transformer.cpp
+20 −28 src/crab/fwd_analyzer.cpp
+2 −6 src/crab/fwd_analyzer.hpp
+1 −2 src/crab/interval.hpp
+0 −94 src/crab/label.hpp
+38 −53 src/crab/split_dbm.cpp
+1 −1 src/crab/split_dbm.hpp
+4 −5 src/crab/thresholds.cpp
+1 −1 src/crab/thresholds.hpp
+1 −1 src/crab/type_domain.cpp
+2 −4 src/crab/var_factory.cpp
+0 −1 src/crab/variable.hpp
+7 −8 src/crab_utils/debug.hpp
+5 −0 src/crab_utils/stats.cpp
+4 −23 src/crab_utils/stats.hpp
+196 −129 src/crab_verifier.cpp
+9 −36 src/crab_verifier.hpp
+1 −1 src/linux/gpl/spec_prototypes.cpp
+18 −20 src/linux/linux_platform.cpp
+0 −2 src/linux/linux_platform.hpp
+24 −48 src/main/check.cpp
+16 −18 src/main/linux_verifier.cpp
+1 −1 src/main/run_yaml.cpp
+16 −0 src/main/utils.hpp
+11 −13 src/spec_type_descriptors.hpp
+61 −48 src/test/ebpf_yaml.cpp
+1 −1 src/test/ebpf_yaml.hpp
+1 −1 src/test/test_conformance.cpp
+1 −0 src/test/test_marshal.cpp
+1 −0 src/test/test_print.cpp
+53 −70 src/test/test_verify.cpp
+30 −30 src/test/test_wto.cpp
+1 −1 src/test/test_yaml.cpp
+24 −6 test-data/add.yaml
+46 −35 test-data/assign.yaml
+86 −86 test-data/atomic.yaml
+14 −7 test-data/call.yaml
+14 −7 test-data/callx.yaml
+20 −11 test-data/full64.yaml
+136 −61 test-data/jump.yaml
+106 −62 test-data/loop.yaml
+53 −34 test-data/movsx.yaml
+16 −43 test-data/packet.yaml
+60 −41 test-data/pointer.yaml
+20 −11 test-data/sdivmod.yaml
+10 −7 test-data/sext.yaml
+309 −70 test-data/shift.yaml
+13 −11 test-data/stack.yaml
+4 −2 test-data/subtract.yaml
+22 −12 test-data/udivmod.yaml
+41 −38 test-data/uninit.yaml
+6 −6 test-data/unop.yaml
+224 −196 test-data/unsigned.yaml
2 changes: 1 addition & 1 deletion external/ubpf
Submodule ubpf updated 35 files
+2 −2 .github/workflows/dependency-review.yml
+14 −126 .github/workflows/fuzzing.yml
+2 −4 .github/workflows/main.yml
+5 −5 .github/workflows/posix.yml
+2 −2 .github/workflows/scorecards.yml
+1 −1 .github/workflows/update-docs.yml
+3 −11 .github/workflows/windows.yml
+1 −1 .gitmodules
+0 −5 CMakeLists.txt
+0 −1 custom_tests/data/ubpf_test_custom_local_function_stack_size_zero.input
+0 −1 custom_tests/data/ubpf_test_default_local_function_stack_size.input
+0 −0 custom_tests/data/ubpf_test_external_stack_contents.input
+0 −11 custom_tests/descrs/ubpf_test_custom_local_function_stack_size_unaligned.md
+0 −60 custom_tests/descrs/ubpf_test_custom_local_function_stack_size_zero.md
+0 −40 custom_tests/descrs/ubpf_test_default_local_function_stack_size.md
+0 −0 custom_tests/descrs/ubpf_test_external_stack_contents.md
+0 −58 custom_tests/srcs/ubpf_test_custom_local_function_stack_size_unaligned.cc
+0 −116 custom_tests/srcs/ubpf_test_custom_local_function_stack_size_zero.cc
+0 −116 custom_tests/srcs/ubpf_test_default_local_function_stack_size.cc
+3 −43 custom_tests/srcs/ubpf_test_external_stack_contents.cc
+1 −1 external/bpf_conformance
+1 −1 external/ebpf-verifier
+16 −11 libfuzzer/CMakeLists.txt
+32 −66 libfuzzer/libfuzz_harness.cc
+0 −2 tests/call0.data
+0 −25 tests/call_local_use_stack.data
+0 −13 tests/err-call-invalid-jump.data
+0 −147 ubpf/dictionary_generator.py
+23 −0 ubpf_plugin/ubpf_plugin.cc
+0 −1 vm/CMakeLists.txt
+6 −19 vm/inc/ubpf.h
+0 −1,010 vm/ubpf_instruction_valid.c
+1 −20 vm/ubpf_int.h
+3 −21 vm/ubpf_jit_arm64.c
+32 −181 vm/ubpf_vm.c
2 changes: 1 addition & 1 deletion external/usersim
21 changes: 14 additions & 7 deletions libs/api/ebpf_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4266,15 +4266,22 @@ _ebpf_ring_buffer_map_async_query_completion(_Inout_ void* completion_context) N
break;
}

int callback_result = subscription->sample_callback(
subscription->sample_callback_context,
const_cast<void*>(reinterpret_cast<const void*>(record->data)),
record->header.length - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));
if (callback_result != 0) {
break;
while (ebpf_ring_buffer_record_is_locked(record)) {
// The record is being written to by the producer.
// Wait for the producer to finish writing.
YieldProcessor();
}

consumer += record->header.length;
if (!ebpf_ring_buffer_record_is_discarded(record)) {
int callback_result = subscription->sample_callback(
subscription->sample_callback_context,
const_cast<void*>(reinterpret_cast<const void*>(record->data)),
ebpf_ring_buffer_record_size(record) - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));
if (callback_result != 0) {
break;
}
}
consumer += ebpf_ring_buffer_record_size(record);
}
}

Expand Down
263 changes: 115 additions & 148 deletions libs/runtime/ebpf_ring_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,80 +8,16 @@

typedef struct _ebpf_ring_buffer
{
ebpf_lock_t lock;
size_t length;
size_t consumer_offset;
size_t producer_offset;
int64_t length;
int64_t mask;
cxplat_spin_lock_t producer_lock;
cxplat_spin_lock_t consumer_lock;
volatile int64_t consumer_offset;
volatile int64_t producer_offset;
uint8_t* shared_buffer;
ebpf_ring_descriptor_t* ring_descriptor;
} ebpf_ring_buffer_t;

inline static size_t
_ring_get_length(_In_ const ebpf_ring_buffer_t* ring)
{
return ring->length;
}

inline static size_t
_ring_get_producer_offset(_In_ const ebpf_ring_buffer_t* ring)
{
return ring->producer_offset % ring->length;
}

inline static size_t
_ring_get_consumer_offset(_In_ const ebpf_ring_buffer_t* ring)
{
return ring->consumer_offset % ring->length;
}

inline static size_t
_ring_get_used_capacity(_In_ const ebpf_ring_buffer_t* ring)
{
ebpf_assert(ring->producer_offset >= ring->consumer_offset);
return ring->producer_offset - ring->consumer_offset;
}

inline static void
_ring_advance_producer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length)
{
ring->producer_offset += length;
}

inline static void
_ring_advance_consumer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length)
{
ring->consumer_offset += length;
}

inline static _Ret_notnull_ ebpf_ring_buffer_record_t*
_ring_record_at_offset(_In_ const ebpf_ring_buffer_t* ring, size_t offset)
{
return (ebpf_ring_buffer_record_t*)&ring->shared_buffer[offset % ring->length];
}

inline static _Ret_notnull_ ebpf_ring_buffer_record_t*
_ring_next_consumer_record(_In_ const ebpf_ring_buffer_t* ring)
{
return _ring_record_at_offset(ring, _ring_get_consumer_offset(ring));
}

inline static _Ret_maybenull_ ebpf_ring_buffer_record_t*
_ring_buffer_acquire_record(_Inout_ ebpf_ring_buffer_t* ring, size_t requested_length)
{
ebpf_ring_buffer_record_t* record = NULL;
requested_length += EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data);
size_t remaining_space = ring->length - (ring->producer_offset - ring->consumer_offset);

if (remaining_space > requested_length) {
record = _ring_record_at_offset(ring, _ring_get_producer_offset(ring));
_ring_advance_producer_offset(ring, requested_length);
record->header.length = (uint32_t)requested_length;
record->header.locked = 1;
record->header.discarded = 0;
}
return record;
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_create(_Outptr_ ebpf_ring_buffer_t** ring, size_t capacity)
{
Expand All @@ -100,6 +36,7 @@ ebpf_ring_buffer_create(_Outptr_ ebpf_ring_buffer_t** ring, size_t capacity)
}

local_ring_buffer->length = capacity;
local_ring_buffer->mask = capacity - 1;

local_ring_buffer->ring_descriptor = ebpf_allocate_ring_buffer_memory(capacity);
if (!local_ring_buffer->ring_descriptor) {
Expand Down Expand Up @@ -135,78 +72,81 @@ _Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_output(_Inout_ ebpf_ring_buffer_t* ring, _In_reads_bytes_(length) uint8_t* data, size_t length)
{
ebpf_result_t result;
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length);
uint8_t* buffer;

if (record == NULL) {
result = EBPF_OUT_OF_SPACE;
goto Done;
result = ebpf_ring_buffer_reserve(ring, &buffer, length);
if (result != EBPF_SUCCESS) {
return result;
}

record->header.discarded = 0;
record->header.locked = 0;
memcpy(record->data, data, length);
result = EBPF_SUCCESS;
Done:
ebpf_lock_unlock(&ring->lock, state);
return result;
memcpy(buffer, data, length);

return ebpf_ring_buffer_submit(buffer);
}

void
ebpf_ring_buffer_query(_In_ ebpf_ring_buffer_t* ring, _Out_ size_t* consumer, _Out_ size_t* producer)
{
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
*consumer = ring->consumer_offset;
*producer = ring->producer_offset;
ebpf_lock_unlock(&ring->lock, state);
*consumer = (size_t)ReadAcquire64(&ring->consumer_offset);
*producer = (size_t)ReadAcquire64(&ring->producer_offset);
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_return(_Inout_ ebpf_ring_buffer_t* ring, size_t length)
{
EBPF_LOG_ENTRY();
ebpf_result_t result;
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
size_t local_length = length;
size_t offset = _ring_get_consumer_offset(ring);

if ((length > _ring_get_length(ring)) || length > _ring_get_used_capacity(ring)) {
EBPF_LOG_MESSAGE_UINT64_UINT64(
EBPF_TRACELOG_LEVEL_ERROR,
EBPF_TRACELOG_KEYWORD_MAP,
"ebpf_ring_buffer_return: Buffer too large",
ring->producer_offset,
ring->consumer_offset);
result = EBPF_INVALID_ARGUMENT;
goto Done;
KIRQL old_irql = KeGetCurrentIrql();
if (old_irql < DISPATCH_LEVEL) {
KeRaiseIrqlToDpcLevel();
}

// Verify count.
while (local_length != 0) {
ebpf_ring_buffer_record_t* record = _ring_record_at_offset(ring, offset);
if (local_length < record->header.length) {
break;
}
offset += record->header.length;
local_length -= record->header.length;
cxplat_acquire_spin_lock_at_dpc_level(&ring->consumer_lock);

int64_t consumer_offset = ReadNoFence64(&ring->consumer_offset);
int64_t producer_offset = ReadNoFence64(&ring->producer_offset);
int64_t effective_length = EBPF_PAD_8(length + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

if (consumer_offset == producer_offset) {
result = EBPF_INVALID_ARGUMENT;
goto Exit;
}
// Did it end on a record boundary?
if (local_length != 0) {
EBPF_LOG_MESSAGE_UINT64(
EBPF_TRACELOG_LEVEL_ERROR,
EBPF_TRACELOG_KEYWORD_MAP,
"ebpf_ring_buffer_return: Invalid buffer length",
local_length);

int64_t remaining_space = producer_offset - consumer_offset;

if (remaining_space > effective_length) {
result = EBPF_INVALID_ARGUMENT;
goto Done;
goto Exit;
}

_ring_advance_consumer_offset(ring, length);
remaining_space = effective_length;

while (remaining_space > 0) {
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(ring->shared_buffer + (consumer_offset & ring->mask));

long size = ReadNoFence(&record->size);
size += EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data);
size = EBPF_PAD_8(size);

consumer_offset += size;
remaining_space -= size;

record->size = 0;
}

WriteNoFence64(&ring->consumer_offset, consumer_offset);

result = EBPF_SUCCESS;

Done:
ebpf_lock_unlock(&ring->lock, state);
EBPF_RETURN_RESULT(result);
Exit:
cxplat_release_spin_lock_from_dpc_level(&ring->consumer_lock);

if (old_irql < DISPATCH_LEVEL) {
KeLowerIrql(old_irql);
}

EBPF_RETURN_RESULT(EBPF_SUCCESS);
}

_Must_inspect_result_ ebpf_result_t
Expand All @@ -225,55 +165,82 @@ ebpf_ring_buffer_reserve(
_Inout_ ebpf_ring_buffer_t* ring, _Outptr_result_bytebuffer_(length) uint8_t** data, size_t length)
{
ebpf_result_t result;
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length);
if (record == NULL) {
result = EBPF_INVALID_ARGUMENT;
goto Done;
KIRQL old_irql;
int64_t producer_offset = ReadNoFence64(&ring->producer_offset);
int64_t consumer_offset = ReadNoFence64(&ring->consumer_offset);
int64_t remaining_space = ring->length - (producer_offset - consumer_offset);
size_t effective_length = EBPF_PAD_8(length + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

if (remaining_space < (int64_t)effective_length) {
return EBPF_NO_MEMORY;
}

old_irql = KeGetCurrentIrql();
if (old_irql < DISPATCH_LEVEL) {
KeRaiseIrqlToDpcLevel();
}

cxplat_acquire_spin_lock_at_dpc_level(&ring->producer_lock);

producer_offset = ReadNoFence64(&ring->producer_offset);
consumer_offset = ReadNoFence64(&ring->consumer_offset);

remaining_space = ring->length - (producer_offset - consumer_offset);

if (remaining_space < (int64_t)effective_length) {
result = EBPF_NO_MEMORY;
goto Exit;
}

record->header.locked = 1;
MemoryBarrier();
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(ring->shared_buffer + (producer_offset & ring->mask));

WriteNoFence(&record->size, (long)length | EBPF_RING_BUFFER_RECORD_FLAG_LOCKED);
*data = record->data;

WriteNoFence64(&ring->producer_offset, producer_offset + effective_length);

result = EBPF_SUCCESS;
Done:
ebpf_lock_unlock(&ring->lock, state);

Exit:
cxplat_release_spin_lock_from_dpc_level(&ring->producer_lock);

if (old_irql < DISPATCH_LEVEL) {
KeLowerIrql(old_irql);
}

return result;
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_submit(_Frees_ptr_opt_ uint8_t* data)
{
if (!data) {
ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data);
long size = ReadAcquire(&record->size);

if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) {
return EBPF_INVALID_ARGUMENT;
}
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

record->header.discarded = 0;
// Place a memory barrier here so that all prior writes to the record are completed before the record
// is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and
// the data in the record.
MemoryBarrier();
record->header.locked = 0;

size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED;

WriteRelease(&record->size, size);
return EBPF_SUCCESS;
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_discard(_Frees_ptr_opt_ uint8_t* data)
{
if (!data) {
ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data);
long size = ReadAcquire(&record->size);

if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) {
return EBPF_INVALID_ARGUMENT;
}
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

record->header.discarded = 1;
// Place a memory barrier here so that all prior writes to the record are completed before the record
// is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and
// the data in the record.
MemoryBarrier();
record->header.locked = 0;

size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED;
size |= EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED;

WriteRelease(&record->size, size);
return EBPF_SUCCESS;
}
1 change: 1 addition & 0 deletions libs/runtime/ebpf_ring_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ebpf_ring_buffer_destroy(_Frees_ptr_opt_ ebpf_ring_buffer_t* ring_buffer);
* @retval EBPF_SUCCESS Successfully wrote record ring buffer.
* @retval EBPF_OUT_OF_SPACE Unable to output to ring buffer due to inadequate space.
*/
EBPF_INLINE_HINT
_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_output(_Inout_ ebpf_ring_buffer_t* ring_buffer, _In_reads_bytes_(length) uint8_t* data, size_t length);

Expand Down
Loading

0 comments on commit a5fe86d

Please sign in to comment.