Skip to content

Commit

Permalink
Ringbuffer design updates.
Browse files Browse the repository at this point in the history
Splits the design so callback consumers use the existing libbpf APIs,
and mapped memory consumers use the new windows-specific functions.
  • Loading branch information
Michael Agun committed Oct 18, 2024
1 parent a848715 commit ea47230
Showing 1 changed file with 41 additions and 83 deletions.
124 changes: 41 additions & 83 deletions docs/RingBuffer.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,15 @@ The new API will support 2 consumer types: callbacks and direct access to the ma

Callback consumer:

1. Call `ring_buffer__new` to set up callback
2. Call `ring_buffer__consume` as needed to invoke the callback on any outsanding data that is ready
1. Call `ring_buffer__new` to set up callback.
2. The callback will be invoked for each record written to the ring buffer.

Mapped memory consumer:

1. Call `ring_buffer__new` to get a ringbuffer manager.
2. Call `ebpf_ring_buffer_get_buffer` to get pointers to the mapped producer/consumer pages.
3. Call `ebpf_ring_buffer_get_wait_handle` to get the wait handle.
4. Directly read records from the producer pages (and update consumer offset as we read).
5. Call `WaitForSingleObject`/`WaitForMultipleObject` as needed to wait for new data to be available.
1. Call `ebpf_ring_buffer_get_buffer` to get pointers to the mapped producer/consumer pages.
2. Call `ebpf_ring_buffer_get_wait_handle` to get the wait handle.
3. Directly read records from the producer pages (and update consumer offset as we read).
4. Call `WaitForSingleObject`/`WaitForMultipleObject` as needed to wait for new data to be available.

## API Changes

Expand All @@ -40,10 +39,15 @@ ebpf_result_t
ebpf_ring_buffer_output(ebpf_ring_buffer_t* ring, uint8_t* data, size_t length, size_t flags)
```
### Updated supported libbpf functions
### Existing libbpf functions for callback consumer
```c
The behaviour of these functions will be unchanged.
Use the existing `ring_buffer__new()` to set up automatic callbacks for each record.
Call `ebpf_ring_buffer_get_buffer()` ([New eBPF APIs](#new-ebpf-apis-for-mapped-memory-consumer))
to get direct access to the mapped ringbuffer memory.
```c
struct ring_buffer;
typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
Expand All @@ -68,50 +72,16 @@ struct ring_buffer *
ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
const struct ring_buffer_opts *opts);
/**
* @brief Add extra RINGBUF maps to this ring buffer manager
*
* @param[in] rb Pointer to ring buffer manager.
* @param[in] map_fd File descriptor to ring buffer map.
* @param[in] sample_cb Pointer to ring buffer notification callback function.
* @param[in] ctx Pointer to sample_cb callback function context.
*/
int ring_buffer__add(struct ring_buffer *rb, int map_fd,
ring_buffer_sample_fn sample_cb, void *ctx)
/**
* @brief Frees a ring buffer manager.
*
* @param[in] rb Pointer to ring buffer manager to be freed.
*
*/
void ring_buffer__free(struct ring_buffer *rb);
/**
* @brief poll ringbuf for new data
* Poll for available data and consume records, if any are available.
* Returns number of records consumed (or INT_MAX, whichever is less), or
* negative number, if any of the registered callbacks returned error.
*
* @param[in] rb Pointer to ring buffer manager.
* @param[in] timeout_ms maximum time to wait for (in milliseconds).
*
*/
int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
/**
* @brief catch consumer up to producer by invoking the callback for every available record
* Consume available ring buffer(s) data without event polling.
* Returns number of records consumed across all registered ring buffers (or
* INT_MAX, whichever is less), or negative number if any of the callbacks
* return error.
*
* @param[in] rb Pointer to ring buffer manager.
*/
int ring_buffer__consume(struct ring_buffer *rb);
```

### New ebpf APIs
### New ebpf APIs for mapped memory consumer

```c
/**
Expand All @@ -120,23 +90,19 @@ int ring_buffer__consume(struct ring_buffer *rb);
* @param[out] producer pointer* to start of read-only mapped producer pages
* @param[out] consumer pointer* to start of read-write mapped consumer page
*/
ebpf_result_t ebpf_ring_buffer_get_buffer(const ebpf_map_t* map, void **producer, void **consumer);
ebpf_result_t ebpf_ring_buffer_get_buffer(fd_t map_fd, void **producer, void **consumer);

/**
* get the wait handle to use with WaitForSingleObject/WaitForMultipleObject
*
* @param[out] producer pointer* to start of read-only mapped producer pages
* @param[out] consumer pointer* to start of read-write mapped consumer page
*
* @returns Wait handle
*/
HANDLE ebpf_ring_buffer_get_wait_handle(const ebpf_map_t* map);
HANDLE ebpf_ring_buffer_get_wait_handle(fd_t map_fd);
```
## Ringbuffer consumer
### libbpf mapped memory consumer example
### mapped memory consumer example
This consumer directly accesses the records from the producer memory and directly updates the consumer offset to show the logic.
Expand All @@ -149,7 +115,7 @@ This consumer directly accesses the records from the producer memory and directl
// Ring buffer record is 64 bit header + data.
typedef struct _rb_header
{
//NOTE: bit fields are not portable, so this is just for simpler example code -- the actual code should use bit masking to perform equivalent operations on the header bits.
//NOTE: bit fields are not portable, so this is just for simpler example code -- the actual code should use bit masking to perform equivalent operations on the header bits, and ReadAcquire to read the header.
uint8_t locked : 1;
uint8_t discarded : 1;
uint32_t length : 30;
Expand All @@ -166,7 +132,7 @@ typedef struct _rb_record
* @brief clear the ringbuffer.
*/
void rb_flush(uint64_t *cons_offset, const uint64_t *prod_offset) {
*cons_offset = *prod_offset;
WriteRelease64(cons_offset,ReadAcquire64(prod_offset));
}
Expand All @@ -181,32 +147,28 @@ void *rb_prod; // Pointer to start of read-only producer pages.
fd_t map_fd = bpf_obj_get(rb_map_name.c_str());
if (map_fd == ebpf_fd_invalid) return 1;
auto rb = ring_buffer__new(
map_fd,
NULL, //callback function for callback-based consumer
nullptr, nullptr);
if (!rb) return 1;
// Initialize iocp wait handle.
HANDLE wait_handle = ebpf_ring_buffer_get_wait_handle(rb);
// Initialize wait handle for map.
HANDLE wait_handle = ebpf_ring_buffer_get_wait_handle(map_fd);
if (!wait_handle) {
// … log error …
goto Cleanup;
goto Exit;
}
// get pointers to the producer/consumer pages
int err = ebpf_ring_buffer_get_buffer(&rb_prod, &rb_cons);
int err = ebpf_ring_buffer_get_buffer(map_fd, &rb_prod, &rb_cons);
if (err) {
goto Cleanup;
goto Exit;
}
const uint64_t *prod_offset = (const uint64_t*)rb_prod; // Producer offset ptr (r only).
uint64_t *cons_offset = (uint64_t*)rb_cons; // Consumer offset ptr (r/w mapped).
const uint8_t *rb_data = ((const uint8_t*)rb_prod) + PAGESIZE; // Double-mapped rb data ptr (r only).
uint64_t producer_offset = ReadAcquire64(prod_offset);
uint64_t consumer_offset = *cons_offset; // only one consumer so don't need ReadAcquire.
// have_data used to track whether we should wait for notification or just keep reading.
bool have_data = *prod_offset > *cons_offset;
bool have_data = producer_offset > consumer_offset;
void *lp_ctx = NULL;
OVERLAPPED *overlapped = NULL;
Expand All @@ -215,7 +177,7 @@ DWORD bytesTransferred = 0;
// Now loop until error.
For(;;) {
if (!have_data) { // Only wait if we have already caught up.
// Wait for rb to notify -- or we could spin/poll on *prod_offset > *cons_offset.
// Wait for rb to notify -- or we could spin/poll until *prod_offset > *cons_offset.
DWORD wait_status = WaitForSingleObject(wait_handle, INFINITE);
if (wait_status != WAIT_OBJECT_0) { // No notification
Expand All @@ -224,15 +186,14 @@ For(;;) {
// … log error …
break;
}
have_data = *prod_offset > *cons_offset; // It's possible we still have data.
producer_offset = ReadAcquire64(prod_offset);
have_data = producer_offset > consumer_offset; // It's possible we still have data.
if (!have_data) continue;
} else { // We got notified of new data.
have_data = true;
}
}
uint64_t prod = *prod_offset;
uint64_t cons = *cons_offset;
uint64_t remaining = prod - cons;
uint64_t remaining = producer_offset - consumer_offset;
if (remaining == 0) {
have_data = false; // Caught up to producer.
Expand All @@ -244,30 +205,27 @@ For(;;) {
}
// Check header flags first, then read/skip data and update offset.
rb_header_t header = *(rb_header_t*)(&rb_data[cons % rb_size]);
if (header.locked) { // Next record not ready yet, wait on iocp.
rb_header_t header = (rb_header_t)(&rb_data[consumer_offset % rb_size]);
if (header.locked) { // Next record not ready yet, wait.
have_data = false;
continue;
// Or we could spin/poll on ((rb_header_t*)(&rb_data[cons % rb_size]))->locked.
// Or we could spin/poll on ((rb_header_t*)(&rb_data[consumer_offset % rb_size]))->locked.
}
if (!header.discarded) {
const rb_record_t *record = *(const rb_record_t*)(&rb_data[cons % rb_size]);
const rb_record_t *record = *(const rb_record_t*)(&rb_data[consumer_offset % rb_size]);
// Read data from record->data[0 ... record->length-1].
// … business logic …
} // Else it was discarded, skip and continue.
// Update consumer offset (and pad record length to multiple of 8).
cons += sizeof(rb_header_t) + (record->length + 7 & ~7);
*cons_offset = cons;
consumer_offset += sizeof(rb_header_t) + (record->length + 7 & ~7);
WriteRelease64(cons_offset, consumer_offset);
}
Cleanup:
// Close ringbuffer.
ring_buffer__free(rb);
Exit:
```

### Simplified blocking ringbuf consumer
### Simplified polling ringbuf consumer

This consumer uses some possible helpers to simplify the above logic (might also want timeout).

Expand All @@ -285,8 +243,8 @@ for(;;) {
// … Do record handling here …
}
// 3 cases for err:
// 1) Ringbuf empty - Wait with epoll, or poll for !rb__empty(prod,cons).
// 2) Record locked - Wait with epoll, or spin/poll on header lock bit.
// 1) Ringbuf empty - Wait on handle, or poll for !rb__empty(prod,cons).
// 2) Record locked - Wait on handle, or spin/poll on header lock bit.
// 3) Corrupt record or consumer offset - Break (could flush to continue reading from next good record).
if (err!=E_EMPTY && err!=E_LOCKED) {
// … log error …
Expand Down

0 comments on commit ea47230

Please sign in to comment.