From ce5f676bf3355fa7cc1b51ded8b2010922bcaae7 Mon Sep 17 00:00:00 2001 From: Wan Shen Lim Date: Wed, 20 Jan 2021 19:42:33 -0500 Subject: [PATCH] Replace dirty spinwaits with pthread cond waits in messenger test. (#1444) --- test/messenger/messenger_test.cpp | 113 +++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 18 deletions(-) diff --git a/test/messenger/messenger_test.cpp b/test/messenger/messenger_test.cpp index 98eb009e29..239e758c50 100644 --- a/test/messenger/messenger_test.cpp +++ b/test/messenger/messenger_test.cpp @@ -1,5 +1,6 @@ #include "messenger/messenger.h" +#include #include #include @@ -112,22 +113,47 @@ TEST_F(MessengerTests, BasicReplicationTest) { done[1] = false; done[2] = false; - auto spin_until_init = [init]() { + auto *pmutex = static_cast( + mmap(nullptr, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0)); + NOISEPAGE_ASSERT(MAP_FAILED != pmutex, "mmap() failed."); + pthread_mutexattr_t mutexattr; + pthread_mutexattr_init(&mutexattr); + pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(pmutex, &mutexattr); + + auto *pcond = static_cast( + mmap(nullptr, sizeof(pthread_cond_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0)); + NOISEPAGE_ASSERT(MAP_FAILED != pcond, "mmap() failed."); + pthread_condattr_t condattr; + pthread_condattr_init(&condattr); + pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED); + pthread_cond_init(pcond, &condattr); + + auto sleep_until_init = [init, pmutex, pcond]() { while (!(init[0] && init[1] && init[2])) { + pthread_mutex_lock(pmutex); + pthread_cond_wait(pcond, pmutex); + pthread_mutex_unlock(pmutex); } }; - auto spin_until_done = [done]() { + auto sleep_until_done = [done, pmutex, pcond]() { while (!(done[0] && done[1] && done[2])) { + pthread_mutex_lock(pmutex); + pthread_cond_wait(pcond, pmutex); + pthread_mutex_unlock(pmutex); } }; + auto wake_all = [pcond]() { pthread_cond_broadcast(pcond); }; + VoidFn primary_fn = [=]() { auto primary = BuildDBMain(port_primary, port_messenger_primary, "primary"); primary->GetNetworkLayer()->GetServer()->RunServer(); init[0] = true; - spin_until_init(); + wake_all(); + sleep_until_init(); DirtySleep(); while (!(done[1] && done[2])) { @@ -135,7 +161,8 @@ TEST_F(MessengerTests, BasicReplicationTest) { MESSENGER_LOG_TRACE("Primary done."); done[0] = true; - spin_until_done(); + wake_all(); + sleep_until_done(); MESSENGER_LOG_TRACE("Primary exit."); primary->ForceShutdown(); }; @@ -145,7 +172,8 @@ TEST_F(MessengerTests, BasicReplicationTest) { replica1->GetNetworkLayer()->GetServer()->RunServer(); init[1] = true; - spin_until_init(); + wake_all(); + sleep_until_init(); DirtySleep(); // Set up a connection to the primary. @@ -186,7 +214,8 @@ TEST_F(MessengerTests, BasicReplicationTest) { MESSENGER_LOG_TRACE("Replica 1 done."); done[1] = true; - spin_until_done(); + wake_all(); + sleep_until_done(); MESSENGER_LOG_TRACE("Replica 1 exit."); replica1->ForceShutdown(); }; @@ -196,7 +225,8 @@ TEST_F(MessengerTests, BasicReplicationTest) { replica2->GetNetworkLayer()->GetServer()->RunServer(); init[2] = true; - spin_until_init(); + wake_all(); + sleep_until_init(); DirtySleep(); // Set up a connection to the primary. @@ -237,7 +267,8 @@ TEST_F(MessengerTests, BasicReplicationTest) { MESSENGER_LOG_TRACE("Replica 2 done."); done[2] = true; - spin_until_done(); + wake_all(); + sleep_until_done(); MESSENGER_LOG_TRACE("Replica 2 exit."); replica2->ForceShutdown(); }; @@ -245,8 +276,12 @@ TEST_F(MessengerTests, BasicReplicationTest) { std::vector pids = ForkTests({primary_fn, replica1_fn, replica2_fn}); // Spin until all done. - while (!(done[0] && done[1] && done[2])) { - } + sleep_until_done(); + + pthread_mutex_destroy(pmutex); + pthread_mutexattr_destroy(&mutexattr); + pthread_cond_destroy(pcond); + pthread_condattr_destroy(&condattr); DirtySleep(); { @@ -257,6 +292,14 @@ TEST_F(MessengerTests, BasicReplicationTest) { UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast(const_cast(done)), 3 * sizeof(bool)); NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed."); } + { + UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast(pmutex), sizeof(pthread_mutex_t)); + NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed."); + } + { + UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast(pcond), sizeof(pthread_cond_t)); + NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed."); + } } // NOLINTNEXTLINE @@ -283,16 +326,34 @@ TEST_F(MessengerTests, BasicListenTest) { done[0] = false; done[1] = false; - auto spin_until_init = [init]() { + auto *pmutex = static_cast( + mmap(nullptr, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0)); + NOISEPAGE_ASSERT(MAP_FAILED != pmutex, "mmap() failed."); + pthread_mutexattr_t mutexattr; + pthread_mutexattr_init(&mutexattr); + pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(pmutex, &mutexattr); + + auto *pcond = static_cast( + mmap(nullptr, sizeof(pthread_cond_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0)); + NOISEPAGE_ASSERT(MAP_FAILED != pcond, "mmap() failed."); + pthread_condattr_t condattr; + pthread_condattr_init(&condattr); + pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED); + pthread_cond_init(pcond, &condattr); + + auto sleep_until_init = [init]() { while (!(init[0] && init[1])) { } }; - auto spin_until_done = [done]() { + auto sleep_until_done = [done]() { while (!(done[0] && done[1])) { } }; + auto wake_all = [pcond]() { pthread_cond_broadcast(pcond); }; + VoidFn primary_fn = [=]() { auto primary = BuildDBMain(port_primary, port_messenger_primary, "primary"); primary->GetNetworkLayer()->GetServer()->RunServer(); @@ -311,7 +372,8 @@ TEST_F(MessengerTests, BasicListenTest) { }); init[0] = true; - spin_until_init(); + wake_all(); + sleep_until_init(); DirtySleep(); while (!done[1]) { @@ -319,7 +381,8 @@ TEST_F(MessengerTests, BasicListenTest) { MESSENGER_LOG_TRACE("Primary done."); done[0] = true; - spin_until_done(); + wake_all(); + sleep_until_done(); MESSENGER_LOG_TRACE("Primary exit."); primary->ForceShutdown(); }; @@ -328,7 +391,8 @@ TEST_F(MessengerTests, BasicListenTest) { auto replica1 = BuildDBMain(port_replica1, port_messenger_replica1, "replica1"); replica1->GetNetworkLayer()->GetServer()->RunServer(); init[1] = true; - spin_until_init(); + wake_all(); + sleep_until_init(); DirtySleep(); // Set up a connection to the primary via the listen endpoint. @@ -363,7 +427,8 @@ TEST_F(MessengerTests, BasicListenTest) { MESSENGER_LOG_TRACE("Replica 1 done."); done[1] = true; - spin_until_done(); + wake_all(); + sleep_until_done(); MESSENGER_LOG_TRACE("Replica 1 exit."); replica1->ForceShutdown(); }; @@ -371,8 +436,12 @@ TEST_F(MessengerTests, BasicListenTest) { std::vector pids = ForkTests({primary_fn, replica1_fn}); // Spin until all done. - while (!(done[0] && done[1])) { - } + sleep_until_done(); + + pthread_mutex_destroy(pmutex); + pthread_mutexattr_destroy(&mutexattr); + pthread_cond_destroy(pcond); + pthread_condattr_destroy(&condattr); DirtySleep(); { @@ -383,6 +452,14 @@ TEST_F(MessengerTests, BasicListenTest) { UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast(const_cast(done)), 2 * sizeof(bool)); NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed."); } + { + UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast(pmutex), sizeof(pthread_mutex_t)); + NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed."); + } + { + UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast(pcond), sizeof(pthread_cond_t)); + NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed."); + } } } // namespace noisepage::messenger