Skip to content

Commit

Permalink
Cherrypick fixes (#83)
Browse files Browse the repository at this point in the history
* resolve deadlock in file watcher on_modified_event

* improve unit test

* resolve the bug in multiple listeners case in file watcher

* ad manual token parser

* bump conan version

Co-authored-by: Ravi Akella email = [email protected] <raakella@sdsbuild07>
  • Loading branch information
raakella1 and Ravi Akella email = [email protected] authored Jan 27, 2023
1 parent 0f889a9 commit 4e5d86a
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 48 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "8.2.7"
version = "8.2.8"
homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
topics = ("ebay", "components", "core", "efficiency")
Expand Down
3 changes: 3 additions & 0 deletions include/sisl/auth_manager/trf_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class TrfClient {
protected:
// acquire unique lock before calling
virtual void request_with_grant_token();
void parse_response(const std::string& resp);
static std::string get_string(const std::string& resp, const std::string& pattern);
static std::string get_quoted_string(const std::string& resp, const std::string& pattern);

protected:
std::string m_access_token;
Expand Down
2 changes: 1 addition & 1 deletion include/sisl/file_watcher/file_watcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FileWatcher {
private:
int m_inotify_fd;
std::map< std::string, FileInfo > m_files;
std::mutex m_files_lock;
mutable std::mutex m_files_lock;
std::unique_ptr< std::thread > m_fw_thread;
// This is used to notify poll loop to exit
int m_pipefd[2] = {-1, -1};
Expand Down
32 changes: 23 additions & 9 deletions src/auth_manager/tests/AuthTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class MockTrfClient : public TrfClient {

void set_expiry(std::chrono::system_clock::time_point tp) { m_expiry = tp; }
std::string get_access_token() { return m_access_token; }
std::chrono::system_clock::time_point get_expiry() { return m_expiry; }

void parse_token(const std::string& resp) { TrfClient::parse_response(resp); }
};

static void load_trf_settings() {
Expand Down Expand Up @@ -220,14 +223,8 @@ static const std::string trf_token_server_ip{"127.0.0.1"};
static const uint32_t trf_token_server_port{12346};
static std::string token_response;
static void set_token_response(const std::string& raw_token) {
token_response = "{\n"
" \"access_token\": \"" +
raw_token +
"\",\n"
" \"token_type\": \"Bearer\",\n"
" \"expires_in\": 2000,\n"
" \"refresh_token\": \"dummy_refresh_token\"\n"
"}";
token_response = "{\"access_token\":\"" + raw_token +
"\",\"token_type\":\"Bearer\",\"expires_in\":2000,\"refresh_token\":\"dummy_refresh_token\"}\n";
}

class TokenApiImpl : public TokenApi {
Expand Down Expand Up @@ -256,7 +253,10 @@ class TrfClientTest : public ::testing::Test {
APIBase::start();
}

virtual void TearDown() override { APIBase::stop(); }
virtual void TearDown() override {
APIBase::stop();
remove_grant_path();
}

private:
std::unique_ptr< TokenApiImpl > m_token_server;
Expand Down Expand Up @@ -297,6 +297,20 @@ TEST_F(TrfClientTest, request_with_grant_token) {
EXPECT_EQ(raw_token, mock_trf_client.get_access_token());
EXPECT_EQ("Bearer", mock_trf_client.get_token_type());
}

TEST(TrfClientParseTest, parse_token) {
load_trf_settings();
MockTrfClient mock_trf_client;
const auto raw_token{TestToken().sign_rs256()};
set_token_response(raw_token);
EXPECT_TRUE(mock_trf_client.get_access_token().empty());
EXPECT_TRUE(mock_trf_client.get_token_type().empty());
mock_trf_client.parse_token(token_response);
EXPECT_EQ(raw_token, mock_trf_client.get_access_token());
EXPECT_EQ("Bearer", mock_trf_client.get_token_type());
EXPECT_TRUE(mock_trf_client.get_expiry() > std::chrono::system_clock::now());
remove_grant_path();
}
} // namespace sisl::testing

using namespace sisl;
Expand Down
1 change: 0 additions & 1 deletion src/auth_manager/tests/dummy_grant.cg

This file was deleted.

30 changes: 29 additions & 1 deletion src/auth_manager/trf_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,38 @@ void TrfClient::request_with_grant_token() {
m_access_token = resp_json["access_token"];
m_token_type = resp_json["token_type"];
} catch ([[maybe_unused]] const nlohmann::detail::exception& e) {
LOGDEBUG("parsing token response failed, what: {}", e.what());
LOGERROR("parsing token response using json failed, what: {}; trying to parse manually", e.what());
parse_response(resp.text);
}
}

void TrfClient::parse_response(const std::string& resp) {
try {
static std::string token1{"{\"access_token\":"};
static std::string token2{"\"token_type\":"};
static std::string token3{"\"expires_in\":"};

if (m_access_token = get_quoted_string(resp, token1); m_access_token.empty()) { return; }
if (m_token_type = get_quoted_string(resp, token2); m_access_token.empty()) { return; }
auto expiry_str = get_string(resp, token3);
if (expiry_str.empty()) { return; }
m_expiry = std::chrono::system_clock::now() + std::chrono::seconds(std::stol(expiry_str));
} catch (const std::exception& e) { LOGDEBUG("failed to parse pattern, what: {}", e.what()); }
}

std::string TrfClient::get_string(const std::string& resp, const std::string& pattern) {
auto n = resp.find(pattern);
if (n == std::string::npos) { return ""; }
auto n1 = resp.find(',', n);
if (n1 == std::string::npos) { return ""; }
return resp.substr(n + pattern.length(), n1 - n - pattern.length());
}

std::string TrfClient::get_quoted_string(const std::string& resp, const std::string& pattern) {
auto quoted_string{get_string(resp, pattern)};
return quoted_string.substr(1, quoted_string.length() - 2);
}

std::string TrfClient::get_token() {
{
std::shared_lock< std::shared_mutex > lock(m_mtx);
Expand Down
14 changes: 8 additions & 6 deletions src/file_watcher/file_watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ bool FileWatcher::register_listener(const std::string& file_path, const std::str
{
auto lk = std::unique_lock< std::mutex >(m_files_lock);
if (const auto it{m_files.find(file_path)}; it != m_files.end()) {
auto file_info = it->second;
auto& file_info = it->second;
file_info.m_handlers.emplace(listener_id, file_event_handler);
LOGDEBUG("File path {} exists, adding the handler cb for the listener {}", file_path, listener_id);
return true;
Expand Down Expand Up @@ -186,16 +186,18 @@ void FileWatcher::handle_events() {
}

void FileWatcher::on_modified_event(const int wd, const bool is_deleted) {
auto lk = std::unique_lock< std::mutex >(m_files_lock);
FileInfo file_info;
get_fileinfo(wd, file_info);
if (is_deleted) {
// There is a corner case (very unlikely) where a new listener
// regestered for this filepath after the current delete event was triggered.
{
auto lk = std::unique_lock< std::mutex >(m_files_lock);
m_files.erase(file_info.m_filepath);
}
for (const auto& [id, handler] : file_info.m_handlers) {
handler(file_info.m_filepath, true);
}
// There is a corner case (very unlikely) where a new listener
// regestered for this filepath after the current delete event was triggered.
m_files.erase(file_info.m_filepath);
return;
}

Expand Down Expand Up @@ -243,8 +245,8 @@ bool FileWatcher::get_file_contents(const std::string& file_name, std::string& c
return false;
}

// Hold the m_files_lock before calling this method.
void FileWatcher::get_fileinfo(const int wd, FileInfo& file_info) const {
auto lk = std::unique_lock< std::mutex >(m_files_lock);
for (const auto& [file_path, file] : m_files) {
if (file.m_wd == wd) {
file_info = file;
Expand Down
113 changes: 84 additions & 29 deletions src/file_watcher/file_watcher_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,110 @@ using namespace ::testing;

class FileWatcherTest : public ::testing::Test {
public:
std::shared_ptr< FileWatcher > file_watcher;
virtual void SetUp() override {
file_watcher = std::make_shared< FileWatcher >();
EXPECT_TRUE(file_watcher->start());
m_file_change_params.file_watcher = std::make_shared< FileWatcher >();
EXPECT_TRUE(m_file_change_params.file_watcher->start());
}

virtual void TearDown() override { EXPECT_TRUE(file_watcher->stop()); }
virtual void TearDown() override {
EXPECT_TRUE(m_file_change_params.file_watcher->stop());
std::remove(m_file_change_params.file_str.c_str());
}

std::mutex file_change_lock;
std::condition_variable file_change_cv;
struct FileChangeParams {
std::shared_ptr< FileWatcher > file_watcher;
std::string file_str;
std::mutex file_change_lock;
std::condition_variable file_change_cv;
bool is_deleted;
int cb_call_count;
};
FileChangeParams m_file_change_params;
};

void monitor_file_changes(FileWatcherTest::FileChangeParams& file_change_params, const std::string& listener) {
EXPECT_TRUE(file_change_params.file_watcher->register_listener(
file_change_params.file_str, listener,
[&file_change_params, listener](const std::string filepath, const bool deleted) {
EXPECT_EQ(file_change_params.file_str, filepath);
{
std::lock_guard< std::mutex > lg(file_change_params.file_change_lock);
file_change_params.is_deleted = deleted;
file_change_params.cb_call_count--;
}
if (deleted) {
std::ofstream file_of{file_change_params.file_str};
monitor_file_changes(file_change_params, listener);
}
file_change_params.file_change_cv.notify_one();
}));
}

TEST_F(FileWatcherTest, basic_watcher) {
const auto file_path = fs::current_path() / "basic_test.txt";
// remove if exists and then create a new file
fs::remove(file_path);
const std::string file_str{file_path.string()};
std::ofstream file_of{file_str};
bool is_deleted = true;
bool cb_called = false;

EXPECT_TRUE(file_watcher->register_listener(
file_str, "basic_test_listener",
[this, &is_deleted, &cb_called, &file_str](const std::string filepath, const bool deleted) {
EXPECT_EQ(file_str, filepath);
{
std::lock_guard< std::mutex > lg(file_change_lock);
is_deleted = deleted;
cb_called = true;
}
file_change_cv.notify_one();
}));
m_file_change_params.file_str = file_path.string();
std::ofstream file_of{m_file_change_params.file_str};
m_file_change_params.is_deleted = true;
m_file_change_params.cb_call_count = 1;

monitor_file_changes(m_file_change_params, "basic_listener");

// edit the file
file_of << "Hello World!";
file_of.flush();
{
auto lk = std::unique_lock< std::mutex >(file_change_lock);
EXPECT_TRUE(file_change_cv.wait_for(lk, std::chrono::milliseconds(500), [&cb_called]() { return cb_called; }));
EXPECT_FALSE(is_deleted);
cb_called = false; // set it false for the next iteration of the test
auto lk = std::unique_lock< std::mutex >(m_file_change_params.file_change_lock);
EXPECT_TRUE(m_file_change_params.file_change_cv.wait_for(
lk, std::chrono::milliseconds(1500), [this]() { return m_file_change_params.cb_call_count == 0; }));
EXPECT_FALSE(m_file_change_params.is_deleted);
m_file_change_params.cb_call_count = 1; // set it 1 for the next iteration of the test
}

// remove the file
fs::remove(file_path);
{
auto lk = std::unique_lock< std::mutex >(file_change_lock);
EXPECT_TRUE(file_change_cv.wait_for(lk, std::chrono::milliseconds(500), [&cb_called]() { return cb_called; }));
EXPECT_TRUE(is_deleted);
auto lk = std::unique_lock< std::mutex >(m_file_change_params.file_change_lock);
EXPECT_TRUE(m_file_change_params.file_change_cv.wait_for(
lk, std::chrono::milliseconds(1500), [this]() { return m_file_change_params.cb_call_count == 0; }));
EXPECT_TRUE(m_file_change_params.is_deleted);
m_file_change_params.cb_call_count = 1; // set it 1 for the next iteration of the test
}

/* TODO fix this in CI.
std::ofstream file_of1{m_file_change_params.file_str};
file_of1 << "Hello World Again!";
file_of1.flush();
{
auto lk = std::unique_lock< std::mutex >(m_file_change_params.file_change_lock);
EXPECT_TRUE(m_file_change_params.file_change_cv.wait_for(
lk, std::chrono::milliseconds(1500), [this]() { return m_file_change_params.cb_call_count == 0; }));
EXPECT_FALSE(m_file_change_params.is_deleted);
}
*/
}

TEST_F(FileWatcherTest, multiple_watchers) {
const auto file_path = fs::current_path() / "basic_test.txt";
// remove if exists and then create a new file
fs::remove(file_path);
m_file_change_params.file_str = file_path.string();
std::ofstream file_of{m_file_change_params.file_str};
m_file_change_params.is_deleted = true;
m_file_change_params.cb_call_count = 2;

monitor_file_changes(m_file_change_params, "basic_listener1");
monitor_file_changes(m_file_change_params, "basic_listener2");

// edit the file
file_of << "Hello World!";
file_of.flush();
{
auto lk = std::unique_lock< std::mutex >(m_file_change_params.file_change_lock);
EXPECT_TRUE(m_file_change_params.file_change_cv.wait_for(
lk, std::chrono::milliseconds(1500), [this]() { return m_file_change_params.cb_call_count == 0; }));
EXPECT_FALSE(m_file_change_params.is_deleted);
}
}

Expand Down

0 comments on commit 4e5d86a

Please sign in to comment.