Skip to content

Commit

Permalink
Update ringbuffer API.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Agun committed Nov 20, 2024
1 parent 938d642 commit 407f3c1
Showing 1 changed file with 153 additions and 17 deletions.
170 changes: 153 additions & 17 deletions docs/RingBuffer.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,30 @@ Mapped memory consumer:
3. Directly read records from the producer pages (and update consumer offset as we read).
4. Call `WaitForSingleObject`/`WaitForMultipleObject` as needed to wait for new data to be available.

## API Changes
### Differences from linux API

Linux has only polling ring buffer consumers, even when callbacks are used.
On linux the user code can call `ring_buffer__consume()` to invoke the callback on all available data,
and `ring_buffer__poll()` to wait for data if needed and then consume available data.
Linux consumers can also directly read from the mapped memory by using `mmap()` to map the data
into user space and `ring_buffer__epoll_fd()` to get an epoll wait handle.

On Windows asynchronous events are supported by default,
so nothing extra needs to be done for callbacks to be invoked.

If the `RINGBUF_FLAG_NO_AUTO_CALLBACK` flag is set, callbacks will not automatically be called and `ring_buffer__poll()`
should be called to poll for available data and invoke the callback. On Windows a timeout of zero can be passed to
`ring_buffer__poll()` to get the same behaviour as `ring_buffer__consume()`.

For direct memory mapped consumers on Windows, use `ebpf_ring_buffer_get_buffer` to get pointers to the producer and consumer
pages mapped into user space, and `ebpf_ring_buffer_get_wait_handle()` to get the SynchronizationEvent (auto-reset) KEVENT
to use with `WaitForSingleObject`/`WaitForMultipleObject`.

Similar to the linux memory layout, the first page of the producer and consumer memory is the "producer page" and "consumer page",
which contain the 64 bit producer and consumer offsets as the first 8 bytes.
Only the producer may update the producer offset, and only the consumer may update the consumer offset.

## ebpf-for-windows API Changes

### Changes to ebpf helper functions

Expand Down Expand Up @@ -78,22 +101,27 @@ ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx,
const struct ring_buffer_opts *opts);
/**
* @brief poll ringbuf for new data (NOT CURRENTLY SUPPORTED)
* @brief poll ringbuf for new data
* Poll for available data and consume records, if any are available.
* Returns number of records consumed (or INT_MAX, whichever is less), or
* negative number, if any of the registered callbacks returned error.
*
* If timeout_ms is zero, poll will not wait but only invoke the callback on records that are ready.
* If timeout_ms is -1, poll will wait until data is ready (no timeout).
*
* This function is only supported when the RINGBUF_FLAG_NO_AUTO_CALLBACK flag is set.
*
* @param[in] rb Pointer to ring buffer manager.
* @param[in] timeout_ms maximum time to wait for (in milliseconds).
*
* @returns number of records consumed, INT_MAX, or a negative number on error
*/
int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
/**
* @brief Frees a ring buffer manager.
*
* @param[in] rb Pointer to ring buffer manager to be freed.
*
*/
void ring_buffer__free(struct ring_buffer *rb);
```
Expand All @@ -104,24 +132,99 @@ void ring_buffer__free(struct ring_buffer *rb);
/**
* get pointers to mapped producer and consumer pages
*
* @param[in] map_fd File descriptor to ring buffer map.
* @param[out] producer pointer* to start of read-only mapped producer pages
* @param[out] consumer pointer* to start of read-write mapped consumer page
*
* @returns EBPF_SUCCESS on success, or error
*/
ebpf_result_t ebpf_ring_buffer_get_buffer(fd_t map_fd, void **producer, void **consumer);

/**
* get the wait handle to use with WaitForSingleObject/WaitForMultipleObject
*
* @param[in] map_fd File descriptor to ring buffer map.
*
* @returns Wait handle
*/
HANDLE ebpf_ring_buffer_get_wait_handle(fd_t map_fd);
```
### New user-space helpers for memory mapped consumer
```c
/**
* The below helpers simplify memory-mapped consumer logic
* by abstracting operations on the producer and consumer offsets.
*/
/**
* Get pointer to consumer offset from consumer page.
*
* @param[in] cons pointer* to start of read-write mapped consumer page
*
* @returns Pointer to consumer offset
*/
uint64_t* rb__consumer_offset(void *cons);
/**
* Get pointer to producer offset from producer page.
*
* @param[in] prod pointer* to start of read-only mapped producer pages
*
* @returns Pointer to producer offset
*/
volatile const uint64_t* rb__producer_offset(volatile const void *prod);
/**
* Check whether consumer offset == producer offset.
*
* Note that not empty doesn't mean data is ready, just that there are records that have been allocated.
* You still need to check the locked and discarded bits of the record header to determine if a record is ready.
*
* @param[in] cons pointer* to start of read-write mapped consumer page
* @param[in] prod pointer* to start of read-only mapped producer pages
*
* @returns 0 if ring buffer is empty, 1 otherwise
*/
int rb__empty(volatile const void *prod, const void *cons);
/**
* Clear the ring buffer by flushing all completed and in-progress records.
*
* This helper just sets the consumer offset to the producer offset
*
* @param[in] prod pointer* to start of read-only mapped producer pages
* @param[in,out] cons pointer* to start of read-write mapped consumer page
*/
void rb__flush(volatile const void *prod, void *cons);
/**
* Advance consumer offset to next record (if any)
*
* @param[in] prod pointer* to start of read-only mapped producer pages
* @param[in,out] cons pointer* to start of read-write mapped consumer page
*/
void rb__next_record(volatile const void *prod, void *cons);
/**
* Get record at current ringbuffer offset.
* @param[in] prod pointer* to start of read-only mapped producer pages
* @param[in] cons pointer* to start of read-write mapped consumer page
*
* @returns E_SUCCESS (0) if record ready, E_LOCKED if record still locked, E_EMPTY if consumer has caught up.
*/
int rb__get_record(volatile const void *prod, const void *cons, volatile const void** record);
```

## Ringbuffer consumer

### mapped memory consumer example

This consumer directly accesses the records from the producer memory and directly updates the consumer offset to show the logic.
This consumer directly accesses the records from the producer memory and directly updates the consumer offset to show the logic. Normally user code should use the ring buffer helpers
(see second example below) to simplify the logic.

```c++

Expand Down Expand Up @@ -244,16 +347,19 @@ Exit:
### Simplified polling ringbuf consumer
This consumer uses some possible helpers to simplify the above logic (might also want timeout).
This consumer uses the newly added helpers to consume the ring buffer.
```c
//Note: the below theoretical helpers would only need access to producers/consumer pages (offsets and data pages)
//rb__empty(prod,cons) - check whether consumer offset == consumer offset (!empty doesn't mean data is ready)
//rb__flush(prod,cons) - just set consumer offset = producer offset (skips all completed/in-progress records)
//rb__next_record(prod,cons) - advance consumer offset to next record
//rb__get_record(prod,cons,&record) - get pointer to current record (if any), skipping discarded records
//Returns E_SUCCESS (0) if record ready, E_LOCKED if record still locked, E_EMPTY if consumer has caught up.
// Initialize wait handle for map.
HANDLE wait_handle = ebpf_ring_buffer_get_wait_handle(map_fd);
if (!wait_handle) {
// … log error …
goto Exit;
}
uint32_t wait_err = 0;
// Consumer loop.
for(;;) {
for(; !(err=rb__get_record(prod,cons,&record)); rb__next_record(prod,cons)) {
// Data is now in record->data[0 ... record->length-1].
Expand All @@ -267,12 +373,42 @@ for(;;) {
// … log error …
break;
}
if (err == /* Handled errors, e.g. timeout */) {
// … log error and continue (we might still have record(s) to read) …
} else if (err != E_SUCCESS) {
// … log error …
break;
DWORD wait_status = WaitForSingleObject(wait_handle, INFINITE);
if (wait_status != WAIT_OBJECT_0) { // No notification
wait_err = GetLastError();
if (wait_err == /* terminal error */) {
// … log error …
break;
}
}
}
return err;
```

### Polling ring buffer consumer (linux-style)

```c
// sample callback
int ring_buffer_sample_fn(void *ctx, void *data, size_t size) {
// … business logic to handle record …
}

// consumer code
struct ring_buffer_opts opts;
opts.sz = sizeof(opts);
opts.flags = RINGBUF_FLAG_NO_AUTO_CALLBACK; //no automatic callbacks

fd_t map_fd = bpf_obj_get(rb_map_name.c_str());
if (map_fd == ebpf_fd_invalid) return 1;

struct ring_buffer *rb = ring_buffer__new(map_fd, ring_buffer_sample_fn sample_cb, NULL);
if (rb == NULL) return 1;

// now loop as long as there isn't an error
while(ring_buffer__poll(rb, -1) >= 0) {
// data processed by event callback
}

ring_buffer__free(rb);
```

0 comments on commit 407f3c1

Please sign in to comment.