From ca4f444471b3a1701e1d73eea90595a9f95ca67d Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Fri, 8 May 2020 11:57:26 -0700 Subject: [PATCH] the last dance: implemented thread-safe stop watch --- .../storage/fileio/read_caching_device.hpp | 61 +++++++---- src/core/storage/fileio/s3_api.cpp | 3 - test/fileio/CMakeLists.txt | 1 + test/fileio/stop_watch_test.cxx | 102 ++++++++++++------ 4 files changed, 113 insertions(+), 54 deletions(-) diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index 2beec2950a..7f41c9e4b2 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -62,10 +62,17 @@ class StopWatch { StopWatch(const StopWatch&) = delete; StopWatch(StopWatch&& rhs) = delete; + /* + * start will register the thread id so that client can use if (stop() == 0) + * to decide whether stop watch should be reallocated. + * + * ideally, use start once and close it once with stop. + * This impl allows double start. + * */ void start() { std::lock_guard lk(mx_); - if (thread_set_.size() == 0) { + if (threads_.size() == 0) { using milliseconds = std::chrono::milliseconds; auto lag = std::chrono::duration_cast( std::chrono::steady_clock::now() - beg_); @@ -75,8 +82,8 @@ class StopWatch { } } - if (!thread_set_.insert(std::this_thread::get_id()).second) { - logstream(LOG_DEBUG) << "this thread " << std::this_thread::get_id() + if (!threads_.insert(std::this_thread::get_id()).second) { + logstream(LOG_ERROR) << "this thread " << std::this_thread::get_id() << "already starts the clock" << std::endl; } } @@ -84,7 +91,7 @@ class StopWatch { bool is_time_to_record() { std::lock_guard lock(mx_); - if (thread_set_.size() == 0) return false; + if (threads_.size() == 0) return false; // reach to timepoint, log it. auto cur = steady_clock::now(); @@ -100,27 +107,31 @@ class StopWatch { /* * @return: int. number of threads still holding */ - int stop() { + int stop(bool no_throw = false) { std::lock_guard lk(mx_); - if (thread_set_.count(std::this_thread::get_id())) { - thread_set_.erase(std::this_thread::get_id()); + + if (threads_.count(std::this_thread::get_id())) { + threads_.erase(std::this_thread::get_id()); // last man standing - if (thread_set_.size() == 0) { + if (threads_.size() == 0) { end_ = steady_clock::now(); return 0; } // still running - return thread_set_.size(); + return threads_.size(); } - return thread_set_.size(); + // no-op + if (no_throw) return threads_.size(); + + std_log_and_throw(std::logic_error, "you don't own this stop watch"); } template Output duration() const { std::lock_guard lk(mx_); // the clock is still on - if (thread_set_.count(std::this_thread::get_id())) { + if (!threads_.empty()) { return std::chrono::duration_cast(steady_clock::now() - beg_); } @@ -129,7 +140,7 @@ class StopWatch { // singleton destroyed since it's non-copy and movable, // any dangling pointer or reference to this stop_watah is illegal - ~StopWatch() { stop(); } + ~StopWatch() { stop(true); } private: uint64_t interval_; @@ -180,16 +191,19 @@ class read_caching_device { auto iter = m_filename_to_filesize_map.find(filename); if (iter != m_filename_to_filesize_map.end()) { m_file_size = iter->second; + m_filename_to_stop_watch[filename]->start(); } else { m_contents = std::make_shared(filename, write); m_file_size = m_contents->file_size(); m_filename_to_filesize_map[filename] = m_file_size; // report downloading every 30s - if (m_filename_to_filesize_map.find(filename) != - m_filename_to_filesize_map.end()) { + if (m_filename_to_stop_watch.find(filename) == + m_filename_to_stop_watch.end()) { m_filename_to_stop_watch[filename] = std::unique_ptr(new StopWatchSec_t(30)); } + // start the clock immediately to register the thread id into it + m_filename_to_stop_watch[filename]->start(); } } else { m_contents = std::make_shared(filename, write); @@ -220,7 +234,9 @@ class read_caching_device { auto iter = m_filename_to_stop_watch.find(m_filename); if (iter != m_filename_to_stop_watch.end()) { - if (!iter->stop()) { + // use no throw since this nanny thread may not start clock + if (iter->second->stop(/* no throw */ true) == 0) { + // nobody is holding m_filename_to_stop_watch.erase(iter); } } @@ -231,9 +247,14 @@ class read_caching_device { m_contents.reset(); { std::lock_guard file_size_guard(m_filesize_cache_mutex); - if (m_filename_to_stop_watch.find(m_filename) != - m_filename_to_stop_watch.end()) - m_filename_to_stop_watch[m_filename]->stop(); + m_filename_to_filesize_map.erase(m_filename); + auto iter = m_filename_to_stop_watch.find(m_filename); + if (iter != m_filename_to_stop_watch.end()) + // use no throw since this nanny thread may not start clock + if (iter->second->stop(/* no throw */ true) == 0) { + // nobody is holding + m_filename_to_stop_watch.erase(iter); + } } } } @@ -280,6 +301,8 @@ class read_caching_device { std::streamsize write(const char* strm_ptr, std::streamsize n) { { std::lock_guard file_size_guard(m_filesize_cache_mutex); + if (m_filename_to_filesize_map.count(m_filename) == 0) + log_and_throw("write through closed files handle"); m_filename_to_stop_watch[m_filename]->start(); } @@ -381,6 +404,8 @@ class read_caching_device { { std::lock_guard file_size_guard(m_filesize_cache_mutex); + if (m_filename_to_filesize_map.count(m_filename) == 0) + log_and_throw("read through closed files handle"); m_filename_to_stop_watch[m_filename]->start(); } diff --git a/src/core/storage/fileio/s3_api.cpp b/src/core/storage/fileio/s3_api.cpp index 12eb255a26..eed24ffa98 100644 --- a/src/core/storage/fileio/s3_api.cpp +++ b/src/core/storage/fileio/s3_api.cpp @@ -540,9 +540,6 @@ list_objects_response list_objects_impl(const s3url& parsed_url, << "list_objects_impl failed:" << ret.error << std::endl; } else { // continue retry - logprogress_stream << "list_objects retry on " - << parsed_url.string_from_s3url(false) - << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(backoff)); backoff *= 2; } diff --git a/test/fileio/CMakeLists.txt b/test/fileio/CMakeLists.txt index 519cd502dc..f0bc201868 100644 --- a/test/fileio/CMakeLists.txt +++ b/test/fileio/CMakeLists.txt @@ -12,6 +12,7 @@ make_boost_test(fixed_size_cache_manager_test.cxx REQUIRES unity_shared_for_testing) make_boost_test(general_fstream_test.cxx REQUIRES unity_shared_for_testing) make_boost_test(parse_hdfs_url_test.cxx REQUIRES unity_shared_for_testing) +make_boost_test(stop_watch_test.cxx REQUIRES unity_shared_for_testing) if(${TC_BUILD_REMOTEFS}) make_executable(s3_filesys_test SOURCES s3_filesys_test.cpp REQUIRES diff --git a/test/fileio/stop_watch_test.cxx b/test/fileio/stop_watch_test.cxx index 723d7f1692..a6f51b773e 100644 --- a/test/fileio/stop_watch_test.cxx +++ b/test/fileio/stop_watch_test.cxx @@ -10,86 +10,116 @@ struct stop_watch_test { using my_watch_t = StopWatch; public: - void test_end_without_start(void) { - my_watch_t watch = my_watch_t(1); - watch.end(); - TS_ASSERT_EQUALS(watch.templace duration(), milliseconds{0}); - TS_ASSERT_THROWS_NOTHING(watch.end()); + void test_stop_without_start(void) { + my_watch_t watch(1); + TS_ASSERT_THROWS_ANYTHING(watch.stop()); } void test_double_start(void) { - my_watch_t watch = my_watch_t(1); + my_watch_t watch(1); watch.start(); - TS_ASSERT_THROWS_ANYTHING(watch.start()); + // no double start + TS_ASSERT_THROWS_NOTHING(watch.start()); } void test_single_thread(void) { - my_watch_t watch = my_watch_t(1); + my_watch_t watch(1); watch.start(); std::this_thread::sleep_for(milliseconds(2)); - TS_ASSERT(watch.templace duration() >= milliseconds(2)); - watch.end(); - TS_ASSERT(watch.templace duration() >= milliseconds(2)); + TS_ASSERT(watch.duration() >= milliseconds(2)); + watch.stop(); + TS_ASSERT(watch.duration() >= milliseconds(2)); } + // main thread's stop watch stops as the last void test_multi_thread_1(void) { - my_watch_t watch = my_watch_t(1); + my_watch_t watch(1); watch.start(); auto start = std::chrono::steady_clock::now(); std::thread t1([&watch]() { watch.start(); - watch.end(); + watch.stop(); }); std::thread t2([&watch]() { std::this_thread::sleep_for(milliseconds(1)); watch.start(); std::this_thread::sleep_for(milliseconds(1)); - watch.end(); + watch.stop(); }); std::this_thread::sleep_for(milliseconds(10)); t1.join(); t2.join(); - auto end = std::chrono::steady_clock::now(); - watch.end(); + auto stop = std::chrono::steady_clock::now(); + watch.stop(); - TS_ASSERT(watch.templace duration() >= - std::chrono::duration_cast(end - start)); + TS_ASSERT(watch.duration() >= + std::chrono::duration_cast(stop - start)); } + // t2 stops as the last one void test_multi_thread_2(void) { - my_watch_t watch = my_watch_t(1); + my_watch_t watch(1); watch.start(); auto start = std::chrono::steady_clock::now(); + std::thread t1([&watch]() { watch.start(); - watch.end(); + TS_ASSERT(watch.stop() > 0); }); std::thread t2([&watch]() { std::this_thread::sleep_for(milliseconds(1)); watch.start(); - std::this_thread::sleep_for(milliseconds(15)); - watch.end(); + std::this_thread::sleep_for(milliseconds(25)); + // t2 is the last one to stop the watch + TS_ASSERT(watch.stop() == 0); }); + t1.join(); - std::this_thread::sleep_for(milliseconds(1)); - auto end = std::chrono::steady_clock::now(); - watch.end(); - TS_ASSERT(watch.templace duration() >= - std::chrono::duration_cast(end - start)); - TS_ASSERT(watch.templace duration() <= milliseconds > (15)); + std::this_thread::sleep_for(milliseconds(5)); + // main also stops + auto stop = std::chrono::steady_clock::now(); + TS_ASSERT(watch.stop() > 0); + + // the clock is still on becuase t2 is still running + TS_ASSERT(watch.duration() >= + std::chrono::duration_cast(stop - start)); + TS_ASSERT(watch.duration() >= milliseconds(5)); + t2.join(); + TS_ASSERT(watch.duration() >= milliseconds(25)); + } + + void test_stop_and_continue(void) { + my_watch_t watch(1); + watch.start(); + watch.stop(); + // interval is less than 150ms + std::this_thread::sleep_for(milliseconds(3)); + watch.start(); + watch.stop(); + std::this_thread::sleep_for(milliseconds(3)); + watch.start(); + watch.stop(); - TS_ASSERT(watch.templace duration() >= milliseconds > (15)); - watch.end(); - TS_ASSERT(watch.templace duration() >= milliseconds > (15)); + TS_ASSERT(watch.duration() >= milliseconds(6)); + } + + void test_time_to_record(void) { + my_watch_t watch(5); + watch.start(); + TS_ASSERT(watch.is_time_to_record() == true); + TS_ASSERT(watch.is_time_to_record() == false); + std::this_thread::sleep_for(milliseconds(5)); + TS_ASSERT(watch.is_time_to_record() == true); + TS_ASSERT(watch.is_time_to_record() == false); } }; BOOST_FIXTURE_TEST_SUITE(_stop_watch_test, stop_watch_test) -BOOST_AUTO_TEST_CASE(test_end_without_start) { - stop_watch_test::test_end_without_start(); +BOOST_AUTO_TEST_CASE(test_stop_without_start) { + stop_watch_test::test_stop_without_start(); } BOOST_AUTO_TEST_CASE(test_double_start) { stop_watch_test::test_double_start(); @@ -103,4 +133,10 @@ BOOST_AUTO_TEST_CASE(test_multi_thread_1) { BOOST_AUTO_TEST_CASE(test_multi_thread_2) { stop_watch_test::test_multi_thread_2(); } +BOOST_AUTO_TEST_CASE(test_stop_and_continue) { + stop_watch_test::test_stop_and_continue(); +} +BOOST_AUTO_TEST_CASE(test_time_to_record) { + stop_watch_test::test_time_to_record(); +} BOOST_AUTO_TEST_SUITE_END()