Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
the last dance: implemented thread-safe stop watch
Browse files Browse the repository at this point in the history
  • Loading branch information
guihao-liang committed May 8, 2020
1 parent fb7394b commit ca4f444
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 54 deletions.
61 changes: 43 additions & 18 deletions src/core/storage/fileio/read_caching_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,17 @@ class StopWatch {
StopWatch(const StopWatch&) = delete;
StopWatch(StopWatch&& rhs) = delete;

/*
* start will register the thread id so that client can use if (stop() == 0)
* to decide whether stop watch should be reallocated.
*
* ideally, use start once and close it once with stop.
* This impl allows double start.
* */
void start() {
std::lock_guard<std::mutex> lk(mx_);

if (thread_set_.size() == 0) {
if (threads_.size() == 0) {
using milliseconds = std::chrono::milliseconds;
auto lag = std::chrono::duration_cast<milliseconds>(
std::chrono::steady_clock::now() - beg_);
Expand All @@ -75,16 +82,16 @@ class StopWatch {
}
}

if (!thread_set_.insert(std::this_thread::get_id()).second) {
logstream(LOG_DEBUG) << "this thread " << std::this_thread::get_id()
if (!threads_.insert(std::this_thread::get_id()).second) {
logstream(LOG_ERROR) << "this thread " << std::this_thread::get_id()
<< "already starts the clock" << std::endl;
}
}

bool is_time_to_record() {
std::lock_guard<std::mutex> lock(mx_);

if (thread_set_.size() == 0) return false;
if (threads_.size() == 0) return false;

// reach to timepoint, log it.
auto cur = steady_clock::now();
Expand All @@ -100,27 +107,31 @@ class StopWatch {
/*
* @return: int. number of threads still holding
*/
int stop() {
int stop(bool no_throw = false) {
std::lock_guard<std::mutex> lk(mx_);
if (thread_set_.count(std::this_thread::get_id())) {
thread_set_.erase(std::this_thread::get_id());

if (threads_.count(std::this_thread::get_id())) {
threads_.erase(std::this_thread::get_id());
// last man standing
if (thread_set_.size() == 0) {
if (threads_.size() == 0) {
end_ = steady_clock::now();
return 0;
}
// still running
return thread_set_.size();
return threads_.size();
}

return thread_set_.size();
// no-op
if (no_throw) return threads_.size();

std_log_and_throw(std::logic_error, "you don't own this stop watch");
}

template <class Output>
Output duration() const {
std::lock_guard<std::mutex> lk(mx_);
// the clock is still on
if (thread_set_.count(std::this_thread::get_id())) {
if (!threads_.empty()) {
return std::chrono::duration_cast<Output>(steady_clock::now() - beg_);
}

Expand All @@ -129,7 +140,7 @@ class StopWatch {

// singleton destroyed since it's non-copy and movable,
// any dangling pointer or reference to this stop_watah is illegal
~StopWatch() { stop(); }
~StopWatch() { stop(true); }

private:
uint64_t interval_;
Expand Down Expand Up @@ -180,16 +191,19 @@ class read_caching_device {
auto iter = m_filename_to_filesize_map.find(filename);
if (iter != m_filename_to_filesize_map.end()) {
m_file_size = iter->second;
m_filename_to_stop_watch[filename]->start();
} else {
m_contents = std::make_shared<T>(filename, write);
m_file_size = m_contents->file_size();
m_filename_to_filesize_map[filename] = m_file_size;
// report downloading every 30s
if (m_filename_to_filesize_map.find(filename) !=
m_filename_to_filesize_map.end()) {
if (m_filename_to_stop_watch.find(filename) ==
m_filename_to_stop_watch.end()) {
m_filename_to_stop_watch[filename] =
std::unique_ptr<StopWatchSec_t>(new StopWatchSec_t(30));
}
// start the clock immediately to register the thread id into it
m_filename_to_stop_watch[filename]->start();
}
} else {
m_contents = std::make_shared<T>(filename, write);
Expand Down Expand Up @@ -220,7 +234,9 @@ class read_caching_device {

auto iter = m_filename_to_stop_watch.find(m_filename);
if (iter != m_filename_to_stop_watch.end()) {
if (!iter->stop()) {
// use no throw since this nanny thread may not start clock
if (iter->second->stop(/* no throw */ true) == 0) {
// nobody is holding
m_filename_to_stop_watch.erase(iter);
}
}
Expand All @@ -231,9 +247,14 @@ class read_caching_device {
m_contents.reset();
{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
if (m_filename_to_stop_watch.find(m_filename) !=
m_filename_to_stop_watch.end())
m_filename_to_stop_watch[m_filename]->stop();
m_filename_to_filesize_map.erase(m_filename);
auto iter = m_filename_to_stop_watch.find(m_filename);
if (iter != m_filename_to_stop_watch.end())
// use no throw since this nanny thread may not start clock
if (iter->second->stop(/* no throw */ true) == 0) {
// nobody is holding
m_filename_to_stop_watch.erase(iter);
}
}
}
}
Expand Down Expand Up @@ -280,6 +301,8 @@ class read_caching_device {
std::streamsize write(const char* strm_ptr, std::streamsize n) {
{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
if (m_filename_to_filesize_map.count(m_filename) == 0)
log_and_throw("write through closed files handle");
m_filename_to_stop_watch[m_filename]->start();
}

Expand Down Expand Up @@ -381,6 +404,8 @@ class read_caching_device {

{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
if (m_filename_to_filesize_map.count(m_filename) == 0)
log_and_throw("read through closed files handle");
m_filename_to_stop_watch[m_filename]->start();
}

Expand Down
3 changes: 0 additions & 3 deletions src/core/storage/fileio/s3_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,6 @@ list_objects_response list_objects_impl(const s3url& parsed_url,
<< "list_objects_impl failed:" << ret.error << std::endl;
} else {
// continue retry
logprogress_stream << "list_objects retry on "
<< parsed_url.string_from_s3url(false)
<< std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(backoff));
backoff *= 2;
}
Expand Down
1 change: 1 addition & 0 deletions test/fileio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ make_boost_test(fixed_size_cache_manager_test.cxx REQUIRES
unity_shared_for_testing)
make_boost_test(general_fstream_test.cxx REQUIRES unity_shared_for_testing)
make_boost_test(parse_hdfs_url_test.cxx REQUIRES unity_shared_for_testing)
make_boost_test(stop_watch_test.cxx REQUIRES unity_shared_for_testing)

if(${TC_BUILD_REMOTEFS})
make_executable(s3_filesys_test SOURCES s3_filesys_test.cpp REQUIRES
Expand Down
102 changes: 69 additions & 33 deletions test/fileio/stop_watch_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,86 +10,116 @@ struct stop_watch_test {
using my_watch_t = StopWatch<milliseconds>;

public:
void test_end_without_start(void) {
my_watch_t watch = my_watch_t(1);
watch.end();
TS_ASSERT_EQUALS(watch.templace duration<milliseconds>(), milliseconds{0});
TS_ASSERT_THROWS_NOTHING(watch.end());
void test_stop_without_start(void) {
my_watch_t watch(1);
TS_ASSERT_THROWS_ANYTHING(watch.stop());
}

void test_double_start(void) {
my_watch_t watch = my_watch_t(1);
my_watch_t watch(1);
watch.start();
TS_ASSERT_THROWS_ANYTHING(watch.start());
// no double start
TS_ASSERT_THROWS_NOTHING(watch.start());
}

void test_single_thread(void) {
my_watch_t watch = my_watch_t(1);
my_watch_t watch(1);
watch.start();
std::this_thread::sleep_for(milliseconds(2));
TS_ASSERT(watch.templace duration<milliseconds>() >= milliseconds(2));
watch.end();
TS_ASSERT(watch.templace duration<milliseconds>() >= milliseconds(2));
TS_ASSERT(watch.duration<milliseconds>() >= milliseconds(2));
watch.stop();
TS_ASSERT(watch.duration<milliseconds>() >= milliseconds(2));
}

// main thread's stop watch stops as the last
void test_multi_thread_1(void) {
my_watch_t watch = my_watch_t(1);
my_watch_t watch(1);
watch.start();
auto start = std::chrono::steady_clock::now();
std::thread t1([&watch]() {
watch.start();
watch.end();
watch.stop();
});

std::thread t2([&watch]() {
std::this_thread::sleep_for(milliseconds(1));
watch.start();
std::this_thread::sleep_for(milliseconds(1));
watch.end();
watch.stop();
});
std::this_thread::sleep_for(milliseconds(10));
t1.join();
t2.join();
auto end = std::chrono::steady_clock::now();
watch.end();
auto stop = std::chrono::steady_clock::now();
watch.stop();

TS_ASSERT(watch.templace duration<milliseconds>() >=
std::chrono::duration_cast<milliseconds>(end - start));
TS_ASSERT(watch.duration<milliseconds>() >=
std::chrono::duration_cast<milliseconds>(stop - start));
}

// t2 stops as the last one
void test_multi_thread_2(void) {
my_watch_t watch = my_watch_t(1);
my_watch_t watch(1);
watch.start();
auto start = std::chrono::steady_clock::now();

std::thread t1([&watch]() {
watch.start();
watch.end();
TS_ASSERT(watch.stop() > 0);
});

std::thread t2([&watch]() {
std::this_thread::sleep_for(milliseconds(1));
watch.start();
std::this_thread::sleep_for(milliseconds(15));
watch.end();
std::this_thread::sleep_for(milliseconds(25));
// t2 is the last one to stop the watch
TS_ASSERT(watch.stop() == 0);
});

t1.join();
std::this_thread::sleep_for(milliseconds(1));
auto end = std::chrono::steady_clock::now();
watch.end();
TS_ASSERT(watch.templace duration<milliseconds>() >=
std::chrono::duration_cast<milliseconds>(end - start));
TS_ASSERT(watch.templace duration<milliseconds>() <= milliseconds > (15));
std::this_thread::sleep_for(milliseconds(5));
// main also stops
auto stop = std::chrono::steady_clock::now();
TS_ASSERT(watch.stop() > 0);

// the clock is still on becuase t2 is still running
TS_ASSERT(watch.duration<milliseconds>() >=
std::chrono::duration_cast<milliseconds>(stop - start));
TS_ASSERT(watch.duration<milliseconds>() >= milliseconds(5));

t2.join();
TS_ASSERT(watch.duration<milliseconds>() >= milliseconds(25));
}

void test_stop_and_continue(void) {
my_watch_t watch(1);
watch.start();
watch.stop();
// interval is less than 150ms
std::this_thread::sleep_for(milliseconds(3));
watch.start();
watch.stop();
std::this_thread::sleep_for(milliseconds(3));
watch.start();
watch.stop();

TS_ASSERT(watch.templace duration<milliseconds>() >= milliseconds > (15));
watch.end();
TS_ASSERT(watch.templace duration<milliseconds>() >= milliseconds > (15));
TS_ASSERT(watch.duration<milliseconds>() >= milliseconds(6));
}

void test_time_to_record(void) {
my_watch_t watch(5);
watch.start();
TS_ASSERT(watch.is_time_to_record() == true);
TS_ASSERT(watch.is_time_to_record() == false);
std::this_thread::sleep_for(milliseconds(5));
TS_ASSERT(watch.is_time_to_record() == true);
TS_ASSERT(watch.is_time_to_record() == false);
}
};

BOOST_FIXTURE_TEST_SUITE(_stop_watch_test, stop_watch_test)
BOOST_AUTO_TEST_CASE(test_end_without_start) {
stop_watch_test::test_end_without_start();
BOOST_AUTO_TEST_CASE(test_stop_without_start) {
stop_watch_test::test_stop_without_start();
}
BOOST_AUTO_TEST_CASE(test_double_start) {
stop_watch_test::test_double_start();
Expand All @@ -103,4 +133,10 @@ BOOST_AUTO_TEST_CASE(test_multi_thread_1) {
BOOST_AUTO_TEST_CASE(test_multi_thread_2) {
stop_watch_test::test_multi_thread_2();
}
BOOST_AUTO_TEST_CASE(test_stop_and_continue) {
stop_watch_test::test_stop_and_continue();
}
BOOST_AUTO_TEST_CASE(test_time_to_record) {
stop_watch_test::test_time_to_record();
}
BOOST_AUTO_TEST_SUITE_END()

0 comments on commit ca4f444

Please sign in to comment.