From d008126d78f851e66f4733b0e1041bbc9c8995a8 Mon Sep 17 00:00:00 2001 From: Imaniac230 <44968160+Imaniac230@users.noreply.github.com> Date: Fri, 23 Feb 2024 21:51:34 +0100 Subject: [PATCH] feat-squash: Adjust and implement new tests for the lock-free ringbuffer implementation. * Adjusted current tests for the lock-free behavior. * Added tests for the non-blocking implementation. * Implemented new tests for scenarios, where one thread always runs slower than the other. --- ouster-ros/src/thread_safe_ring_buffer.h | 9 +- ouster-ros/test/ring_buffer_test.cpp | 398 +++++++++++++++-------- 2 files changed, 265 insertions(+), 142 deletions(-) diff --git a/ouster-ros/src/thread_safe_ring_buffer.h b/ouster-ros/src/thread_safe_ring_buffer.h index ed05eff4..2563e03a 100644 --- a/ouster-ros/src/thread_safe_ring_buffer.h +++ b/ouster-ros/src/thread_safe_ring_buffer.h @@ -136,10 +136,17 @@ class ThreadSafeRingBuffer { /** * Resets the write_idx to an initial value. * @remarks Should be mostly used by tests to allow reading of the final - * item left in the buffer. + * item left in the buffer or restarting the test scenario. */ void reset_write_idx() { write_idx = SIZE_MAX; } + /** + * Resets the read_idx to an initial value. + * @remarks Should be mostly used by tests to allow restarting the test + * scenario. + */ + void reset_read_idx() { read_idx = SIZE_MAX; } + private: /** * Performs the actual sequence of operations for writing. diff --git a/ouster-ros/test/ring_buffer_test.cpp b/ouster-ros/test/ring_buffer_test.cpp index 9c8a2936..ab08e006 100644 --- a/ouster-ros/test/ring_buffer_test.cpp +++ b/ouster-ros/test/ring_buffer_test.cpp @@ -16,7 +16,7 @@ class ThreadSafeRingBufferTest : public ::testing::Test { } void TearDown() override { - buffer.reset(); + buffer.reset(nullptr); } std::string rand_str(int size) { @@ -49,6 +49,8 @@ class ThreadSafeRingBufferTest : public ::testing::Test { void reset_writing() { buffer->reset_write_idx(); } + void reset_reading() { buffer->reset_read_idx(); } + std::unique_ptr buffer; }; @@ -56,7 +58,7 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferSimple) { assert (ITEM_COUNT > 1 && "or this test can't run"); - const int TOTAL_ITEMS = 10; // total items to process + static constexpr int TOTAL_ITEMS = 10; // total items to process const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); @@ -102,7 +104,7 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferSimple) { TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferBlocking) { - const int TOTAL_ITEMS = 10; // total items to process + static constexpr int TOTAL_ITEMS = 10; // total items to process const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); @@ -115,22 +117,20 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferBlocking) { std::memcpy(buffer, &source[i][0], ITEM_SIZE); }); } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); }); std::thread consumer([this, &target]() { int i = 0; - while (i < TOTAL_ITEMS - 1) { + while (i < TOTAL_ITEMS) { buffer->read([&i, &target](uint8_t* buffer){ std::memcpy(&target[i++][0], buffer, ITEM_SIZE); }); } - // Due to the lock-free implementation, that last item would not be read, since - // the reader can not know if it's still being written to. So we have to reset - // the write index before reading out the buffer. - reset_writing(); - buffer->read([&i, &target](uint8_t* buffer){ - std::memcpy(&target[i++][0], buffer, ITEM_SIZE); - }); }); producer.join(); @@ -140,11 +140,14 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferBlocking) { std::cout << "source " << source[i] << ", target " << target[i] << std::endl; EXPECT_EQ(target[i], source[i]); } + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); } TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferWithOverwrite) { - const int TOTAL_ITEMS = 10; // total items to process + static constexpr int TOTAL_ITEMS = 10; // total items to process const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); @@ -157,16 +160,17 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferWithOverwrite) { std::memcpy(buffer, &source[i][0], ITEM_SIZE); }); } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); }); // wait for 1 second before starting the consumer thread // allowing sufficient time for the producer thread to be // completely done std::this_thread::sleep_for(1s); - // Due to the lock-free implementation, that last item would not be read, since - // the reader can not know if it's still being written to. So we have to reset - // the write index before reading out the buffer. - reset_writing(); std::thread consumer([this, &target]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->read_timeout([i, &target](uint8_t* buffer){ @@ -200,11 +204,14 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferWithOverwrite) { std::cout << "source " << source[i] << ", target " << target[i] << std::endl; EXPECT_EQ(target[i], "0000"); } + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); } TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferNonblocking) { - const int TOTAL_ITEMS = 10; // total items to process + static constexpr int TOTAL_ITEMS = 10; // total items to process const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); @@ -217,16 +224,17 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferNonblocking) { std::memcpy(buffer, &source[i][0], ITEM_SIZE); }); } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); }); // wait for 1 second before starting the consumer thread // allowing sufficient time for the producer thread to be // completely done std::this_thread::sleep_for(1s); - // Due to the lock-free implementation, that last item would not be read, since - // the reader can not know if it's still being written to. So we have to reset - // the write index before reading out the buffer. - reset_writing(); std::thread consumer([this, &target]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->read_nonblock([i, &target](uint8_t* buffer){ @@ -251,77 +259,69 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferNonblocking) { std::cout << "source " << source[i] << ", target " << target[i] << std::endl; EXPECT_EQ(target[i], "0000"); } + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); } TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferBlockingThrottling) { - const int TOTAL_ITEMS = 10; // total items to process + static constexpr int TOTAL_ITEMS = 10; // total items to process const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); static constexpr std::chrono::milliseconds period(10); + static constexpr int period_slowing_factor = 4; EXPECT_TRUE(buffer->empty()); EXPECT_FALSE(buffer->full()); - // First, the producer will write to the buffer faster than the consumer can read. - std::thread faster_producer([this, &source]() { + // First, the consumer will read faster than the producer can write. + std::thread slower_producer([this, &source]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->write([i, &source](uint8_t* buffer){ std::memcpy(buffer, &source[i][0], ITEM_SIZE); }); - std::this_thread::sleep_for(period); + std::this_thread::sleep_for(period * period_slowing_factor); } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); }); - std::thread slower_consumer([this, &target]() { + std::thread faster_consumer([this, &target]() { int i = 0; - while (i < TOTAL_ITEMS - 1) { + while (i < TOTAL_ITEMS) { buffer->read([&i, &target](uint8_t* buffer){ std::memcpy(&target[i++][0], buffer, ITEM_SIZE); }); - std::this_thread::sleep_for(period * 4); + std::this_thread::sleep_for(period); } - - // Due to the lock-free implementation, that last item would not be read, since - // the reader can not know if it's still being written to. So we have to reset - // the write index before reading out the buffer. - reset_writing(); - buffer->read([&i, &target](uint8_t* buffer){ - std::memcpy(&target[i++][0], buffer, ITEM_SIZE); - }); }); - faster_producer.join(); - slower_consumer.join(); - - ASSERT_TRUE(buffer->empty()); - ASSERT_FALSE(buffer->full()); + slower_producer.join(); + faster_consumer.join(); - // Blocking read and write should be synchronized even if one thread is faster. - std::cout << "Faster producer, slower consumer:" << std::endl; + // Blocking read and write should always be synchronized even if one thread is faster. + std::cout << "Slower producer, faster consumer:" << std::endl; for (int i = 0; i < TOTAL_ITEMS; ++i) { std::cout << "source " << source[i] << ", target " << target[i] << std::endl; EXPECT_EQ(target[i], source[i]); } + ASSERT_TRUE(buffer->empty()); + ASSERT_FALSE(buffer->full()); target = known_vector_str(TOTAL_ITEMS, "0000"); + reset_writing(); + reset_reading(); - // Then, then consumer will read faster than the producer can write. - std::thread slower_producer([this, &source]() { + // Then, the producer will write to the buffer faster than the consumer can read. + std::thread faster_producer([this, &source]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->write([i, &source](uint8_t* buffer){ std::memcpy(buffer, &source[i][0], ITEM_SIZE); }); - std::this_thread::sleep_for(period * 4); - } - }); - - std::thread faster_consumer([this, &target]() { - int i = 0; - while (i < TOTAL_ITEMS - 1) { - buffer->read([&i, &target](uint8_t* buffer){ - std::memcpy(&target[i++][0], buffer, ITEM_SIZE); - }); std::this_thread::sleep_for(period); } @@ -329,196 +329,312 @@ TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferBlockingThrottling) { // the reader can not know if it's still being written to. So we have to reset // the write index before reading out the buffer. reset_writing(); - buffer->read([&i, &target](uint8_t* buffer){ - std::memcpy(&target[i++][0], buffer, ITEM_SIZE); - }); }); - slower_producer.join(); - faster_consumer.join(); + std::thread slower_consumer([this, &target]() { + int i = 0; + while (i < TOTAL_ITEMS) { + buffer->read([&i, &target](uint8_t* buffer){ + std::memcpy(&target[i++][0], buffer, ITEM_SIZE); + }); + std::this_thread::sleep_for(period * period_slowing_factor); + } + }); - ASSERT_TRUE(buffer->empty()); - ASSERT_FALSE(buffer->full()); + faster_producer.join(); + slower_consumer.join(); - // Blocking read and write should be synchronized even if one thread is faster. - std::cout << "Slower producer, faster consumer:" << std::endl; + // Blocking read and write should always be synchronized even if one thread is faster. + std::cout << "Faster producer, slower consumer:" << std::endl; for (int i = 0; i < TOTAL_ITEMS; ++i) { std::cout << "source " << source[i] << ", target " << target[i] << std::endl; -// EXPECT_EQ(target[i], source[i]); + EXPECT_EQ(target[i], source[i]); } - //TODO: finish asserts - const bool finishedImplementing = false; - ASSERT_TRUE(finishedImplementing); + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); } TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferWithOverwriteThrottling) { - const int TOTAL_ITEMS = 10; // total items to process + static constexpr int TOTAL_ITEMS = 10; // total items to process const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); static constexpr std::chrono::milliseconds period(10); + static constexpr int period_slowing_factor = 4; EXPECT_TRUE(buffer->empty()); EXPECT_FALSE(buffer->full()); - // First, the producer will write to the buffer faster than the consumer can read. - std::thread faster_producer([this, &source]() { + // First, the consumer will read faster than the producer can write. + std::thread slower_producer([this, &source]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->write_overwrite([i, &source](uint8_t* buffer){ std::memcpy(buffer, &source[i][0], ITEM_SIZE); }); - std::this_thread::sleep_for(period); - } - }); - - std::thread slower_consumer([this, &target, TOTAL_ITEMS]() { - for (int i = 0; i < TOTAL_ITEMS; ++i) { - buffer->read_timeout([i, &target](uint8_t* buffer){ - std::memcpy(&target[i][0], buffer, ITEM_SIZE); - }, 1s); - std::this_thread::sleep_for(period * 4); + std::this_thread::sleep_for(period * period_slowing_factor); } // Due to the lock-free implementation, that last item would not be read, since // the reader can not know if it's still being written to. So we have to reset // the write index before reading out the buffer. reset_writing(); - buffer->read_timeout([TOTAL_ITEMS, &target](uint8_t* buffer){ - std::memcpy(&target[TOTAL_ITEMS - 1][0], buffer, ITEM_SIZE); - }, 1s); }); - faster_producer.join(); - slower_consumer.join(); + std::thread faster_consumer([this, &target]() { + int i = 0; + while (i < TOTAL_ITEMS) { + buffer->read_timeout([&i, &target](uint8_t* buffer){ + std::memcpy(&target[i++][0], buffer, ITEM_SIZE); + }, 1s); + std::this_thread::sleep_for(period); + } + }); - ASSERT_TRUE(buffer->empty()); - ASSERT_FALSE(buffer->full()); + slower_producer.join(); + faster_consumer.join(); - // Blocking read and write should be synchronized even if one thread is faster. - std::cout << "Faster producer, slower consumer:" << std::endl; + // If the consumer is faster, it should always keep up with the latest data + // and outputs should be synchronized. + std::cout << "Slower producer, faster consumer:" << std::endl; for (int i = 0; i < TOTAL_ITEMS; ++i) { std::cout << "source " << source[i] << ", target " << target[i] << std::endl; -// EXPECT_EQ(target[i], source[i]); + EXPECT_EQ(target[i], source[i]); } + ASSERT_TRUE(buffer->empty()); + ASSERT_FALSE(buffer->full()); target = known_vector_str(TOTAL_ITEMS, "0000"); + reset_writing(); + reset_reading(); - // Then, then consumer will read faster than the producer can write. - std::thread slower_producer([this, &source]() { + // Then, the producer will write to the buffer faster than the consumer can read. + std::thread faster_producer([this, &source]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->write_overwrite([i, &source](uint8_t* buffer){ std::memcpy(buffer, &source[i][0], ITEM_SIZE); }); - std::this_thread::sleep_for(period * 4); + std::this_thread::sleep_for(period); } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); }); - std::thread faster_consumer([this, &target]() { + std::thread slower_consumer([this, &target]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->read_timeout([i, &target](uint8_t* buffer){ std::memcpy(&target[i][0], buffer, ITEM_SIZE); }, 1s); - std::this_thread::sleep_for(period); + std::this_thread::sleep_for(period * period_slowing_factor); } }); - slower_producer.join(); - faster_consumer.join(); + faster_producer.join(); + slower_consumer.join(); - // Blocking read and write should be synchronized even if one thread is faster. - std::cout << "Slower producer, faster consumer:" << std::endl; - for (int i = 0; i < TOTAL_ITEMS; ++i) { + // This part should be automated for reasonable combinations of ITEM_COUNT and + // TOTAL_ITEMS. Assuming ITEM_COUNT == 3, TOTAL_ITEMS == 10, and period_slowing_factor == 4 + // we should expect the following behavior: + // The first read attempt will start before the producer takes ownership of the next + // index, so the first item will not be filled. + // By the second read attempt, the producer will have performed 1*4 writes, + // ending back at index 0. So the second item is not filled. + // By the third read attempt, the producer wil have performed 2*4 writes, + // now ending at index 1. The consumer will fill one item. + // By the fourth attempt, the producer will have finished writing with the last + // 2 writes. The consumer will fill the last items from the buffer. + // The remaining items will not be filled, since there were no more writes + // being made. + assert ((TOTAL_ITEMS - ITEM_COUNT) > 2 && "or this test section can't run"); + + static constexpr int complete_writes = TOTAL_ITEMS / period_slowing_factor; + static constexpr int read_attempts = (((10 * TOTAL_ITEMS) / period_slowing_factor) + > 10 * complete_writes) ? complete_writes + 1 : complete_writes; + int reading_buffer_idx = 0, written_buffer_idx = 0; + int expected_source_idx = 0, written_source_idx = 0; + + std::cout << "Faster producer, slower consumer:" << std::endl; + std::cout << "source " << source[0] << ", target " << target[0] << std::endl; + EXPECT_EQ(target[0], "0000"); + + // Checking all read attempts happening after a full batch of writes, and + // the final writes plus one extra read. + for (int i = 1; i <= read_attempts + 1; ++i) { + written_source_idx = std::min(i * period_slowing_factor, TOTAL_ITEMS) - 1; + written_buffer_idx = written_source_idx % ITEM_COUNT; + expected_source_idx = written_source_idx - written_buffer_idx + reading_buffer_idx; + if (written_buffer_idx < reading_buffer_idx) + expected_source_idx -= ITEM_COUNT; + + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + if ((written_buffer_idx == reading_buffer_idx) && (written_source_idx != (TOTAL_ITEMS - 1))) { + EXPECT_EQ(target[i], "0000"); + } else { + reading_buffer_idx = (reading_buffer_idx + 1) % ITEM_COUNT; + EXPECT_EQ(target[i], source[expected_source_idx]); + } + } + + // We're not checking all final reads, since that would depend more on the + // relationship between TOTAL_ITEMS and ITEM_COUNT, and not the running behavior. + // So, just printing out any skipped items for completion. + int items_left_in_buffer = ITEM_COUNT; + if (written_buffer_idx < reading_buffer_idx) + items_left_in_buffer += written_buffer_idx + 1 - reading_buffer_idx; + for (int i = read_attempts + 2; i <= read_attempts + items_left_in_buffer; ++i) { std::cout << "source " << source[i] << ", target " << target[i] << std::endl; -// EXPECT_EQ(target[i], source[i]); } - //TODO: finish asserts - const bool finishedImplementing = false; - ASSERT_TRUE(finishedImplementing); + // Since the producer finished with overwrites faster that the consumer + // could read them out, some final items should stay empty. + for (int i = read_attempts + items_left_in_buffer + 1; i < TOTAL_ITEMS; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + EXPECT_EQ(target[i], "0000"); + } + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); } TEST_F(ThreadSafeRingBufferTest, ReadWriteToBufferNonblockingThrottling) { - const int TOTAL_ITEMS = 10; // total items to process + static constexpr int TOTAL_ITEMS = 10; // total items to process const std::vector source = rand_vector_str(TOTAL_ITEMS, ITEM_SIZE); std::vector target = known_vector_str(TOTAL_ITEMS, "0000"); static constexpr std::chrono::milliseconds period(10); + static constexpr int period_slowing_factor = 4; EXPECT_TRUE(buffer->empty()); EXPECT_FALSE(buffer->full()); - // First, the producer will write to the buffer faster than the consumer can read. - std::thread faster_producer([this, &source]() { + // First, the consumer will read faster than the producer can write. + std::thread slower_producer([this, &source]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->write_nonblock([i, &source](uint8_t* buffer){ std::memcpy(buffer, &source[i][0], ITEM_SIZE); }); - std::this_thread::sleep_for(period); + std::this_thread::sleep_for(period * period_slowing_factor); } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); }); - std::thread slower_consumer([this, &target, TOTAL_ITEMS]() { - for (int i = 0; i < TOTAL_ITEMS; ++i) { - buffer->read_nonblock([i, &target](uint8_t* buffer){ - std::memcpy(&target[i][0], buffer, ITEM_SIZE); + std::thread faster_consumer([this, &target]() { + int i = 0; + while (i < TOTAL_ITEMS) { + buffer->read_nonblock([&i, &target](uint8_t* buffer){ + std::memcpy(&target[i++][0], buffer, ITEM_SIZE); }); - std::this_thread::sleep_for(period * 4); + std::this_thread::sleep_for(period); } - - reset_writing(); - buffer->read_nonblock([TOTAL_ITEMS, &target](uint8_t* buffer){ - std::memcpy(&target[TOTAL_ITEMS - 1][0], buffer, ITEM_SIZE); - }); }); - faster_producer.join(); - slower_consumer.join(); - - ASSERT_TRUE(buffer->empty()); - ASSERT_FALSE(buffer->full()); + slower_producer.join(); + faster_consumer.join(); - // Blocking read and write should be synchronized even if one thread is faster. - std::cout << "Faster producer, slower consumer:" << std::endl; + // If the consumer is faster, it should always keep up with the latest data + // and outputs should be synchronized. + std::cout << "Slower producer, faster consumer:" << std::endl; for (int i = 0; i < TOTAL_ITEMS; ++i) { std::cout << "source " << source[i] << ", target " << target[i] << std::endl; - // EXPECT_EQ(target[i], source[i]); + EXPECT_EQ(target[i], source[i]); } + ASSERT_TRUE(buffer->empty()); + ASSERT_FALSE(buffer->full()); target = known_vector_str(TOTAL_ITEMS, "0000"); + reset_writing(); + reset_reading(); - // Then, then consumer will read faster than the producer can write. - std::thread slower_producer([this, &source]() { + // Then, the producer will write to the buffer faster than the consumer can read. + std::thread faster_producer([this, &source]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->write_nonblock([i, &source](uint8_t* buffer){ std::memcpy(buffer, &source[i][0], ITEM_SIZE); }); - std::this_thread::sleep_for(period * 4); + std::this_thread::sleep_for(period); } + + // Due to the lock-free implementation, that last item would not be read, since + // the reader can not know if it's still being written to. So we have to reset + // the write index before reading out the buffer. + reset_writing(); }); - std::thread faster_consumer([this, &target]() { + std::thread slower_consumer([this, &target]() { for (int i = 0; i < TOTAL_ITEMS; ++i) { buffer->read_nonblock([i, &target](uint8_t* buffer){ std::memcpy(&target[i][0], buffer, ITEM_SIZE); }); - std::this_thread::sleep_for(period); + std::this_thread::sleep_for(period * period_slowing_factor); } }); - slower_producer.join(); - faster_consumer.join(); + faster_producer.join(); + slower_consumer.join(); - // Blocking read and write should be synchronized even if one thread is faster. - std::cout << "Slower producer, faster consumer:" << std::endl; - for (int i = 0; i < TOTAL_ITEMS; ++i) { + // This part should be automated for reasonable combinations of ITEM_COUNT and + // TOTAL_ITEMS. Assuming ITEM_COUNT == 3, TOTAL_ITEMS == 10, and period_slowing_factor == 4 + // we should expect the following behavior: + // The first read attempt will start before the producer takes ownership of the next + // index, so the first item will not be filled. + // By the second read attempt, the producer will have performed 1*4 writes, + // filling the buffer and dropping the last write. The consumer will fill in + // the first item. + // By the third read attempt, the producer will have performed 2*4 writes, + // filling only one item and dropping the rest. The consumer will read the second + // item. + // By the fourth attempt, the producer will have finished with the last 2 writes, + // filling only one item and dropping the final one. + // The consumer will read the last items from the buffer. + // The remaining items will not be filled, since there were no more writes + // being made. + assert ((TOTAL_ITEMS - ITEM_COUNT) > 2 && "or this test section can't run"); + assert(period_slowing_factor > 2 && "or this test section can't run"); + + static constexpr int full_writes = ITEM_COUNT / period_slowing_factor; + static constexpr int filling_writes = (((10 * ITEM_COUNT) / period_slowing_factor) + > 10 * full_writes) ? full_writes + 1 : full_writes; + static constexpr int consecutive_reads = std::min(ITEM_COUNT + full_writes, TOTAL_ITEMS - 1); + static constexpr int saturated_reads = + (TOTAL_ITEMS + (TOTAL_ITEMS % period_slowing_factor) - consecutive_reads) / period_slowing_factor; + int expected_source_idx = 0; + + std::cout << "Faster producer, slower consumer:" << std::endl; + std::cout << "source " << source[0] << ", target " << target[0] << std::endl; + EXPECT_EQ(target[0], "0000"); + + // The buffer should always be filled with consecutive items upto the point, + // when the producer catches up with the consumer on its second pass. + for (int i = 1; i <= consecutive_reads; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + EXPECT_EQ(target[i], source[i - 1]); + } + + // Since the producer cannot overwrite, it would always fill just the first item, + // in its iteration. + int writing_iteration = 1; + for (int i = consecutive_reads + 1; i <= consecutive_reads + saturated_reads; ++i) { + expected_source_idx = (full_writes + writing_iteration++) * period_slowing_factor; std::cout << "source " << source[i] << ", target " << target[i] << std::endl; - // EXPECT_EQ(target[i], source[i]); + EXPECT_EQ(target[i], source[expected_source_idx]); } - //TODO: finish asserts - const bool finishedImplementing = false; - ASSERT_TRUE(finishedImplementing); + // Since the producer finished with writes faster that the consumer + // could read them out, some final items should stay empty. + for (int i = consecutive_reads + saturated_reads + 1; i < TOTAL_ITEMS; ++i) { + std::cout << "source " << source[i] << ", target " << target[i] << std::endl; + EXPECT_EQ(target[i], "0000"); + } + + EXPECT_TRUE(buffer->empty()); + EXPECT_FALSE(buffer->full()); } int main(int argc, char** argv)