Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Commit

Permalink
Replace dirty spinwaits with pthread cond waits in messenger test. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lmwnshn authored Jan 21, 2021
1 parent f45d0ed commit ce5f676
Showing 1 changed file with 95 additions and 18 deletions.
113 changes: 95 additions & 18 deletions test/messenger/messenger_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "messenger/messenger.h"

#include <pthread.h>
#include <sys/mman.h>
#include <unistd.h>

Expand Down Expand Up @@ -112,30 +113,56 @@ TEST_F(MessengerTests, BasicReplicationTest) {
done[1] = false;
done[2] = false;

auto spin_until_init = [init]() {
auto *pmutex = static_cast<pthread_mutex_t *>(
mmap(nullptr, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
NOISEPAGE_ASSERT(MAP_FAILED != pmutex, "mmap() failed.");
pthread_mutexattr_t mutexattr;
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(pmutex, &mutexattr);

auto *pcond = static_cast<pthread_cond_t *>(
mmap(nullptr, sizeof(pthread_cond_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
NOISEPAGE_ASSERT(MAP_FAILED != pcond, "mmap() failed.");
pthread_condattr_t condattr;
pthread_condattr_init(&condattr);
pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(pcond, &condattr);

auto sleep_until_init = [init, pmutex, pcond]() {
while (!(init[0] && init[1] && init[2])) {
pthread_mutex_lock(pmutex);
pthread_cond_wait(pcond, pmutex);
pthread_mutex_unlock(pmutex);
}
};

auto spin_until_done = [done]() {
auto sleep_until_done = [done, pmutex, pcond]() {
while (!(done[0] && done[1] && done[2])) {
pthread_mutex_lock(pmutex);
pthread_cond_wait(pcond, pmutex);
pthread_mutex_unlock(pmutex);
}
};

auto wake_all = [pcond]() { pthread_cond_broadcast(pcond); };

VoidFn primary_fn = [=]() {
auto primary = BuildDBMain(port_primary, port_messenger_primary, "primary");
primary->GetNetworkLayer()->GetServer()->RunServer();

init[0] = true;
spin_until_init();
wake_all();
sleep_until_init();
DirtySleep();

while (!(done[1] && done[2])) {
}

MESSENGER_LOG_TRACE("Primary done.");
done[0] = true;
spin_until_done();
wake_all();
sleep_until_done();
MESSENGER_LOG_TRACE("Primary exit.");
primary->ForceShutdown();
};
Expand All @@ -145,7 +172,8 @@ TEST_F(MessengerTests, BasicReplicationTest) {
replica1->GetNetworkLayer()->GetServer()->RunServer();

init[1] = true;
spin_until_init();
wake_all();
sleep_until_init();
DirtySleep();

// Set up a connection to the primary.
Expand Down Expand Up @@ -186,7 +214,8 @@ TEST_F(MessengerTests, BasicReplicationTest) {

MESSENGER_LOG_TRACE("Replica 1 done.");
done[1] = true;
spin_until_done();
wake_all();
sleep_until_done();
MESSENGER_LOG_TRACE("Replica 1 exit.");
replica1->ForceShutdown();
};
Expand All @@ -196,7 +225,8 @@ TEST_F(MessengerTests, BasicReplicationTest) {
replica2->GetNetworkLayer()->GetServer()->RunServer();

init[2] = true;
spin_until_init();
wake_all();
sleep_until_init();
DirtySleep();

// Set up a connection to the primary.
Expand Down Expand Up @@ -237,16 +267,21 @@ TEST_F(MessengerTests, BasicReplicationTest) {

MESSENGER_LOG_TRACE("Replica 2 done.");
done[2] = true;
spin_until_done();
wake_all();
sleep_until_done();
MESSENGER_LOG_TRACE("Replica 2 exit.");
replica2->ForceShutdown();
};

std::vector<pid_t> pids = ForkTests({primary_fn, replica1_fn, replica2_fn});

// Spin until all done.
while (!(done[0] && done[1] && done[2])) {
}
sleep_until_done();

pthread_mutex_destroy(pmutex);
pthread_mutexattr_destroy(&mutexattr);
pthread_cond_destroy(pcond);
pthread_condattr_destroy(&condattr);

DirtySleep();
{
Expand All @@ -257,6 +292,14 @@ TEST_F(MessengerTests, BasicReplicationTest) {
UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast<void *>(const_cast<bool *>(done)), 3 * sizeof(bool));
NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed.");
}
{
UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast<void *>(pmutex), sizeof(pthread_mutex_t));
NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed.");
}
{
UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast<void *>(pcond), sizeof(pthread_cond_t));
NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed.");
}
}

// NOLINTNEXTLINE
Expand All @@ -283,16 +326,34 @@ TEST_F(MessengerTests, BasicListenTest) {
done[0] = false;
done[1] = false;

auto spin_until_init = [init]() {
auto *pmutex = static_cast<pthread_mutex_t *>(
mmap(nullptr, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
NOISEPAGE_ASSERT(MAP_FAILED != pmutex, "mmap() failed.");
pthread_mutexattr_t mutexattr;
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(pmutex, &mutexattr);

auto *pcond = static_cast<pthread_cond_t *>(
mmap(nullptr, sizeof(pthread_cond_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));
NOISEPAGE_ASSERT(MAP_FAILED != pcond, "mmap() failed.");
pthread_condattr_t condattr;
pthread_condattr_init(&condattr);
pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(pcond, &condattr);

auto sleep_until_init = [init]() {
while (!(init[0] && init[1])) {
}
};

auto spin_until_done = [done]() {
auto sleep_until_done = [done]() {
while (!(done[0] && done[1])) {
}
};

auto wake_all = [pcond]() { pthread_cond_broadcast(pcond); };

VoidFn primary_fn = [=]() {
auto primary = BuildDBMain(port_primary, port_messenger_primary, "primary");
primary->GetNetworkLayer()->GetServer()->RunServer();
Expand All @@ -311,15 +372,17 @@ TEST_F(MessengerTests, BasicListenTest) {
});

init[0] = true;
spin_until_init();
wake_all();
sleep_until_init();
DirtySleep();

while (!done[1]) {
}

MESSENGER_LOG_TRACE("Primary done.");
done[0] = true;
spin_until_done();
wake_all();
sleep_until_done();
MESSENGER_LOG_TRACE("Primary exit.");
primary->ForceShutdown();
};
Expand All @@ -328,7 +391,8 @@ TEST_F(MessengerTests, BasicListenTest) {
auto replica1 = BuildDBMain(port_replica1, port_messenger_replica1, "replica1");
replica1->GetNetworkLayer()->GetServer()->RunServer();
init[1] = true;
spin_until_init();
wake_all();
sleep_until_init();
DirtySleep();

// Set up a connection to the primary via the listen endpoint.
Expand Down Expand Up @@ -363,16 +427,21 @@ TEST_F(MessengerTests, BasicListenTest) {

MESSENGER_LOG_TRACE("Replica 1 done.");
done[1] = true;
spin_until_done();
wake_all();
sleep_until_done();
MESSENGER_LOG_TRACE("Replica 1 exit.");
replica1->ForceShutdown();
};

std::vector<pid_t> pids = ForkTests({primary_fn, replica1_fn});

// Spin until all done.
while (!(done[0] && done[1])) {
}
sleep_until_done();

pthread_mutex_destroy(pmutex);
pthread_mutexattr_destroy(&mutexattr);
pthread_cond_destroy(pcond);
pthread_condattr_destroy(&condattr);

DirtySleep();
{
Expand All @@ -383,6 +452,14 @@ TEST_F(MessengerTests, BasicListenTest) {
UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast<void *>(const_cast<bool *>(done)), 2 * sizeof(bool));
NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed.");
}
{
UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast<void *>(pmutex), sizeof(pthread_mutex_t));
NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed.");
}
{
UNUSED_ATTRIBUTE int munmap_retval = munmap(static_cast<void *>(pcond), sizeof(pthread_cond_t));
NOISEPAGE_ASSERT(-1 != munmap_retval, "munmap() failed.");
}
}

} // namespace noisepage::messenger

0 comments on commit ce5f676

Please sign in to comment.