Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a Clear method that resets the read and write indexes. #21

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lockfree/spsc/ring_buf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ template <typename T, size_t size> class RingBuf {
public:
RingBuf();


/**
* @brief Clears the ring buffer.
*/
void Clear();


/**
* @brief Writes data to the ring buffer.
* Should only be called from the producer thread.
Expand Down
6 changes: 6 additions & 0 deletions lockfree/spsc/ring_buf_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ namespace spsc {
template <typename T, size_t size>
RingBuf<T, size>::RingBuf() : _r(0U), _w(0U) {}

template<typename T, size_t size>
void RingBuf<T, size>::Clear() {
_r = 0;
_w = 0;
}

template <typename T, size_t size>
bool RingBuf<T, size>::Write(const T *data, const size_t cnt) {
/* Preload variables with adequate memory ordering */
Expand Down
31 changes: 30 additions & 1 deletion tests/spsc/ring_buf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
TEST_CASE("spsc::RingBuf - Get free with an empty buffer",
"[rb_get_free_empty]") {
lockfree::spsc::RingBuf<float, 1024U> const rb;
const float test_data[120] = {2.71828F};

REQUIRE(rb.GetFree() == 1024U - 1U);
}
Expand Down Expand Up @@ -319,6 +318,36 @@ TEST_CASE("spsc::RingBuf - Peek std::array", "[rb_peek_std_array]") {
std::equal(test_data.begin(), test_data.end(), test_data_read.begin()));
}

TEST_CASE("Get available after clear", "[rb_get_available_after_clear]") {
lockfree::spsc::RingBuf<uint64_t, 1024U> rb;
const std::array<uint64_t, 512> test_data = {0xE5U};

rb.Write(test_data);
rb.Clear();

REQUIRE(rb.GetAvailable() == 0);
}

TEST_CASE("Get free after clear", "[rb_get_free_after_clear]") {
lockfree::spsc::RingBuf<uint64_t, 1024U> rb;
const std::array<uint64_t, 512> test_data = {0xE5U};

rb.Write(test_data);
rb.Clear();

REQUIRE(rb.GetFree() == 1024U -1U);
}

TEST_CASE("Try to read after clear", "[rb_read_clear]") {
lockfree::spsc::RingBuf<uint64_t, 512U> rb;

uint64_t test_data_read[320] = {0};
bool const read_success = rb.Read(
test_data_read, sizeof(test_data_read) / sizeof(test_data_read[0]));

REQUIRE(!read_success);
}

Copy link
Owner

@DNedic DNedic Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add a test for writing to the buffer after clearing.

Copy link
Author

@perencia-wc perencia-wc Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of either approach tbh, as you can also create a new Ring Buffer over the old one. On the other hand I also want the library to be convenient for the people actually using it, so we can go with either of these.

I really don't like the first option, and regarding the second I'm not sure to understand why do we need two methods. I'm thinking in something like

template<typename T, size_t size>
void RingBuf<T, size>::Clear() {
    _r.store(0, std::memory_order_release);
    _w.store(0, std::memory_order_release);
}

wouldn't that be thread safe?

Copy link
Owner

@DNedic DNedic Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operation cannot be thread safe, as it changes both the read and write index. A SPSC queue based data structure works with the assumption that the write index will only ever be changed by the producer and the read index only ever changed by the consumer.

A store with the release memory ordering pairs with a load of the same variable with acquire memory ordering in another thread and they ensure that everything that happens before the release store is visible before the acquire load.

With this in mind, let's take a look at a scenario where the consumer is clearing the buffer, while the producer is writing for the Queue:

template <typename T, size_t size> bool Queue<T, size>::Push(const T &element) {
    /*
       The full check needs to be performed using the next write index not to
       miss the case when the read index wrapped and write index is at the end
     */
    const size_t w = _w.load(std::memory_order_relaxed);
    size_t w_next = w + 1;
    if (w_next == size) {
        w_next = 0U;
    }

    /* Full check  */
    const size_t r = _r.load(std::memory_order_acquire);
    if (w_next == r) {
        return false;
    }

    /* Place the element */
    _data[w] = element;

    /* Store the next write index */
    _w.store(w_next, std::memory_order_release);
    return true;
}

The load and store on _r ensure that this part will be visible (happens before) to the writer trying to clear the buffer:

 const size_t w = _w.load(std::memory_order_relaxed);
    size_t w_next = w + 1;
    if (w_next == size) {
        w_next = 0U;
    }

As this is the only guarantee that we get, it can happen that the writer will think that _w is a completely different value from 0 we are setting in the consumer and later down the line we will replace it with w_next.

Even if we were to replace const size_t w = _w.load(std::memory_order_relaxed); with const size_t w = _w.load(std::memory_order_acquire);, this is not going to work as within the producer and the consumer the ordering of modifications to the indexes is going to be the same, but it would need to be different (and I'm not sure it would work even then, would need to check).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the second approach, it would be thread safe because the consumer could change the read index to match the write index, whatever it is, and vice versa for the producer.

Copy link
Author

@perencia-wc perencia-wc Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your detailed explanation.
I think I understand the problem, though not the solution 😅.
Thinking it twice, a Clear method is not really necessary, it could be replaced with a kind of "Flush" skipping any remaining items until _r == _w, though this can require to walk all the buffer. But anyway I think some kind of clearing can be convenient, i.e as is our case, when the holder of the buffer is neither the consumer nor the producer, and some event requires the buffer to be cleared/flushed. As I can't provide a reliable solution (at least not until I fully understand it), I think it is better to let you decide if that feature of clearing/flush actually fits in the project and how to implement it 😄

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After analyzing this a bit I've found that even _r = _w, or doing Read() in a loop might not be enough, because we can't know if there is a write in progress and, _w will change immedeately after.

The reason I think this is a problem is because usually you want to flush when some assumption has changed and buffered data might not be valid anymore.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A possible solution would be to use atomic flags for signalling from the consumer to the producer and vice versa. This would add overhead for every single operation though.

TEST_CASE("spsc::RingBuf - Peek std::span", "[rb_peek_span]") {
lockfree::spsc::RingBuf<uint64_t, 512U> rb;
const uint64_t test_data[320] = {0xE5U};
Expand Down
Loading