From 8e9659cac77eafc82c407d1a79999f0dfe7ee89a Mon Sep 17 00:00:00 2001 From: Daniil Mironenko Date: Tue, 22 Oct 2024 16:35:37 +0300 Subject: [PATCH] add scheduler tests --- tests/sched/run_loop/intrusive.cpp | 36 ++++ tests/sched/run_loop/unit.cpp | 106 +++++++++++ tests/sched/thread_pool/intrusive.cpp | 40 ++++ tests/sched/thread_pool/unit.cpp | 265 ++++++++++++++++++++++++++ 4 files changed, 447 insertions(+) create mode 100644 tests/sched/run_loop/intrusive.cpp create mode 100644 tests/sched/run_loop/unit.cpp create mode 100644 tests/sched/thread_pool/intrusive.cpp create mode 100644 tests/sched/thread_pool/unit.cpp diff --git a/tests/sched/run_loop/intrusive.cpp b/tests/sched/run_loop/intrusive.cpp new file mode 100644 index 0000000..937c7fe --- /dev/null +++ b/tests/sched/run_loop/intrusive.cpp @@ -0,0 +1,36 @@ +#include + +#include +#include + +using namespace fiber; // NOLINT + +class RunLoopTest : public ::testing::Test {}; + +TEST_F(RunLoopTest, JustWorksIntrusive) { + sched::RunLoop loop; + + bool flag = false; + + class SetFlag : public sched::task::TaskBase { + public: + explicit SetFlag(bool& flag) + : flag_(flag) { + } + + void Run() noexcept override { + flag_ = true; + } + + private: + bool& flag_; + }; + + { + SetFlag set{flag}; + loop.Submit(&set); + loop.RunNext(); + } + + EXPECT_TRUE(flag); +} diff --git a/tests/sched/run_loop/unit.cpp b/tests/sched/run_loop/unit.cpp new file mode 100644 index 0000000..07c5d1b --- /dev/null +++ b/tests/sched/run_loop/unit.cpp @@ -0,0 +1,106 @@ +#include +#include + +#include +#include + +using namespace fiber; // NOLINT + +class RunLoopTest : public ::testing::Test {}; + +TEST_F(RunLoopTest, JustWorks) { + sched::RunLoop loop; + + size_t step = 0; + + EXPECT_FALSE(loop.NonEmpty()); + + EXPECT_FALSE(loop.RunNext()); + EXPECT_EQ(loop.RunAtMost(99), 0); + + sched::task::Submit(loop, [&] { + step = 1; + }); + + EXPECT_TRUE(loop.NonEmpty()); + EXPECT_EQ(step, 0); + + sched::task::Submit(loop, [&] { + step = 2; + }); + + EXPECT_EQ(step, 0); + EXPECT_TRUE(loop.RunNext()); + EXPECT_EQ(step, 1); + EXPECT_TRUE(loop.NonEmpty()); + + sched::task::Submit(loop, [&] { + step = 3; + }); + + EXPECT_EQ(loop.RunAtMost(99), 2); + EXPECT_EQ(step, 3); + EXPECT_FALSE(loop.NonEmpty()); + EXPECT_FALSE(loop.RunNext()); +} + +TEST_F(RunLoopTest, Empty) { + sched::RunLoop loop; + + EXPECT_FALSE(loop.RunNext()); + EXPECT_EQ(loop.RunAtMost(7), 0); + EXPECT_EQ(loop.Run(), 0); +} + +void Countdown(sched::RunLoop& loop, size_t k) { + if (k > 0) { + sched::task::Submit(loop, [&loop, k] { + Countdown(loop, k - 1); + }); + } +} + +TEST_F(RunLoopTest, RunAtMost) { + sched::RunLoop loop; + + Countdown(loop, 256); + + size_t tasks = 0; + do { + tasks += loop.RunAtMost(7); + } while (loop.NonEmpty()); + + fmt::println("{}", tasks); + + EXPECT_EQ(tasks, 256); +} + +TEST_F(RunLoopTest, RunAtMostNewTasks) { + sched::RunLoop loop; + + sched::task::Submit(loop, [&]() { + sched::task::Submit(loop, []() {}); + }); + + EXPECT_EQ(loop.RunAtMost(2), 2); +} + +TEST_F(RunLoopTest, Run) { + sched::RunLoop loop; + + Countdown(loop, 117); + + EXPECT_EQ(loop.Run(), 117); +} + +TEST_F(RunLoopTest, RunTwice) { + sched::RunLoop loop; + + Countdown(loop, 11); + + EXPECT_EQ(loop.Run(), 11); + + Countdown(loop, 7); + + EXPECT_EQ(loop.Run(), 7); +} diff --git a/tests/sched/thread_pool/intrusive.cpp b/tests/sched/thread_pool/intrusive.cpp new file mode 100644 index 0000000..f928930 --- /dev/null +++ b/tests/sched/thread_pool/intrusive.cpp @@ -0,0 +1,40 @@ +#include +#include + +#include +#include + +using namespace fiber; // NOLINT + +class ThreadPoolTest : public ::testing::Test {}; + +TEST_F(ThreadPoolTest, JustWorksIntrusive) { + sched::ThreadPool pool{4}; + pool.Start(); + + class DoWork : public sched::task::TaskBase { + public: + explicit DoWork(sync::WaitGroup& wg) + : wg_(wg) { + } + + void Run() noexcept override { + wg_.Done(); + } + + private: + sync::WaitGroup& wg_; + }; + + { + sync::WaitGroup wg; + + DoWork work{wg}; + + wg.Add(1); + pool.Submit(&work); + wg.Wait(); + } + + pool.Stop(); +} diff --git a/tests/sched/thread_pool/unit.cpp b/tests/sched/thread_pool/unit.cpp new file mode 100644 index 0000000..39968fd --- /dev/null +++ b/tests/sched/thread_pool/unit.cpp @@ -0,0 +1,265 @@ +#include +#include +#include + +#include +#include + +#include +#include +#include + +using namespace std::chrono_literals; // NOLINT +using namespace fiber; // NOLINT + +class ThreadPoolTest : public ::testing::Test {}; + +TEST_F(ThreadPoolTest, WaitTask) { + sched::ThreadPool pool{4}; + pool.Start(); + + sync::WaitGroup wg; + wg.Add(1); + sched::task::Submit(pool, [&wg] { + wg.Done(); + }); + + wg.Wait(); + pool.Stop(); +} + +TEST_F(ThreadPoolTest, Wait) { + sched::ThreadPool pool{4}; + pool.Start(); + + sync::WaitGroup wg; + wg.Add(1); + sched::task::Submit(pool, [&wg] { + std::this_thread::sleep_for(1s); + wg.Done(); + }); + + wg.Wait(); + pool.Stop(); +} + +TEST_F(ThreadPoolTest, MultiWait) { + sched::ThreadPool pool{1}; + pool.Start(); + + for (size_t i = 0; i < 3; ++i) { + sync::WaitGroup wg; + wg.Add(1); + sched::task::Submit(pool, [&wg] { + std::this_thread::sleep_for(1s); + wg.Done(); + }); + + wg.Wait(); + } + + pool.Stop(); +} + +TEST_F(ThreadPoolTest, ManyTasks) { + sched::ThreadPool pool{4}; + pool.Start(); + + static const size_t kTasks = 17; + sync::WaitGroup wg; + + for (size_t i = 0; i < kTasks; ++i) { + wg.Add(1); + sched::task::Submit(pool, [&wg] { + wg.Done(); + }); + } + + wg.Wait(); + pool.Stop(); +} + +TEST_F(ThreadPoolTest, Parallel) { + sched::ThreadPool pool{4}; + pool.Start(); + + std::atomic fast{false}; + sync::WaitGroup wg; + + wg.Add(1); + sched::task::Submit(pool, [&] { + std::this_thread::sleep_for(1s); + wg.Done(); + }); + + wg.Add(1); + sched::task::Submit(pool, [&] { + fast.store(true); + wg.Done(); + }); + + std::this_thread::sleep_for(100ms); + ASSERT_TRUE(fast.load()); + + wg.Wait(); + pool.Stop(); +} + +TEST_F(ThreadPoolTest, TwoPools) { + sched::ThreadPool pool1{1}; + sched::ThreadPool pool2{1}; + + pool1.Start(); + pool2.Start(); + + wheels::StopWatch stop_watch; + + sync::WaitGroup wg1; + wg1.Add(1); + sched::task::Submit(pool1, [&] { + std::this_thread::sleep_for(1s); + wg1.Done(); + }); + + sync::WaitGroup wg2; + wg2.Add(1); + sched::task::Submit(pool2, [&] { + std::this_thread::sleep_for(1s); + wg2.Done(); + }); + + wg2.Wait(); + pool2.Stop(); + + wg1.Wait(); + pool1.Stop(); + + ASSERT_TRUE(stop_watch.Elapsed() < 1500ms); +} + +TEST_F(ThreadPoolTest, Current) { + sched::ThreadPool pool{1}; + pool.Start(); + + ASSERT_EQ(sched::ThreadPool::Current(), nullptr); + + sync::WaitGroup wg; + wg.Add(1); + + sched::task::Submit(pool, [&] { + ASSERT_EQ(sched::ThreadPool::Current(), &pool); + wg.Done(); + }); + + wg.Wait(); + pool.Stop(); +} + +TEST_F(ThreadPoolTest, SubmitAfterWait) { + sched::ThreadPool pool{4}; + pool.Start(); + + sync::WaitGroup wg; + + wg.Add(1); + sched::task::Submit(pool, [&] { + std::this_thread::sleep_for(500ms); + + wg.Add(1); + sched::task::Submit(pool, [&] { + std::this_thread::sleep_for(500ms); + wg.Done(); + }); + + wg.Done(); + }); + + wg.Wait(); + pool.Stop(); +} + +TEST_F(ThreadPoolTest, UseThreads) { + sched::ThreadPool pool{4}; + pool.Start(); + + sync::WaitGroup wg; + + for (size_t i = 0; i < 4; ++i) { + wg.Add(1); + sched::task::Submit(pool, [&wg] { + std::this_thread::sleep_for(750ms); + wg.Done(); + }); + } + + wg.Wait(); + pool.Stop(); +} + +TEST_F(ThreadPoolTest, TooManyThreads) { + sched::ThreadPool pool{3}; + pool.Start(); + + sync::WaitGroup wg; + + for (size_t i = 0; i < 4; ++i) { + wg.Add(1); + sched::task::Submit(pool, [&wg] { + std::this_thread::sleep_for(750ms); + wg.Done(); + }); + } + + wheels::StopWatch stop_watch; + wg.Wait(); + pool.Stop(); + + ASSERT_TRUE(stop_watch.Elapsed() > 1s); +} + +TEST_F(ThreadPoolTest, TaskLifetime) { + sched::ThreadPool pool{4}; + pool.Start(); + + struct Widget {}; + auto w = std::make_shared(); + + sync::WaitGroup wg; + for (int i = 0; i < 4; ++i) { + wg.Add(1); + sched::task::Submit(pool, [w, &wg] { + wg.Done(); + }); + } + + std::this_thread::sleep_for(500ms); + ASSERT_EQ(w.use_count(), 1); + + wg.Wait(); + pool.Stop(); +} + +TEST_F(ThreadPoolTest, Racy) { + sched::ThreadPool pool{4}; + pool.Start(); + + std::atomic racy_counter{0}; + static const size_t kTasks = 100500; + + sync::WaitGroup wg; + + for (size_t i = 0; i < kTasks; ++i) { + wg.Add(1); + sched::task::Submit(pool, [&] { + int old = racy_counter.load(); + racy_counter.store(old + 1); + wg.Done(); + }); + } + + wg.Wait(); + pool.Stop(); + + std::cout << "Racy counter value: " << racy_counter.load() << std::endl; + ASSERT_LE(racy_counter.load(), kTasks); +}