Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #2288 #2293

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/follower.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ log_file_max_size = "10GB"
log_file_rotate_count = 10

# trace/debug/info/warning/error/critical 6 log levels, default: info
log_level = "debug"
log_level = "trace"

[storage]
persistence_dir = "/var/infinity/follower/persistence"
Expand Down
2 changes: 1 addition & 1 deletion conf/leader.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ log_file_max_size = "10GB"
log_file_rotate_count = 10

# trace/debug/info/warning/error/critical 6 log levels, default: info
log_level = "debug"
log_level = "trace"

[storage]
persistence_dir = "/var/infinity/leader/persistence"
Expand Down
1 change: 1 addition & 0 deletions python/test_cluster/infinity_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def clear(self):
for runner in self.runners.values():
runner.uninit()
self.runners.clear()
self.leader_runner = None

def clear_log(self):
log_file = self._log_filename()
Expand Down
8 changes: 3 additions & 5 deletions python/test_cluster/test_member_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,10 @@ def test_cluster_leader_follower_change(cluster : InfinityCluster):
cluster.remove_node("node2")


@pytest.mark.skip(reason="bugs")
@pytest.mark.parametrize("kill", [False, True])
# @pytest.mark.parametrize("kill", [False, True])
@pytest.mark.parametrize("leader_shutdown", [True, False])
def test_cluster_shutdown_and_recover(
cluster: InfinityCluster, kill: bool, leader_shutdown: bool
):
def test_cluster_shutdown_and_recover(cluster: InfinityCluster, leader_shutdown: bool):
kill = False
with cluster:
logger = cluster.logger

Expand Down
25 changes: 15 additions & 10 deletions src/storage/persistence/obj_stat_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module obj_stat_accessor;
import infinity_exception;
import logger;
import third_party;
import obj_status;

namespace infinity {

Expand Down Expand Up @@ -100,10 +101,10 @@ void ObjectStatMap::Recover(const String &key) {
UnrecoverableError(fmt::format("Recover object {} ref count is {}", key, lru_iter->obj_stat_.ref_count_));
}
lru_list_.splice(lru_list_.begin(), cleanuped_list_, lru_iter);
if (obj_stat.cached_) {
auto expect = ObjCached::kNotCached;
if (not obj_stat.cached_.compare_exchange_strong(expect, ObjCached::kCached)) {
UnrecoverableError(fmt::format("Recover object {} not cleaned", key));
}
obj_stat.cached_ = true;
}

Optional<ObjStat> ObjectStatMap::Invalidate(const String &key) {
Expand All @@ -116,7 +117,11 @@ Optional<ObjStat> ObjectStatMap::Invalidate(const String &key) {
if (obj_stat.ref_count_ > 0) {
UnrecoverableError(fmt::format("Invalidate object {} ref count is {}", key, obj_stat.ref_count_));
}
if (obj_stat.cached_) {
ObjCached cached = obj_stat.cached_.load();
if (cached == ObjCached::kDownloading) {
UnrecoverableError(fmt::format("Invalidate object {} is downloading", key));
}
if (cached == ObjCached::kCached) {
lru_list_.erase(lru_iter);
} else {
cleanuped_list_.erase(lru_iter);
Expand All @@ -134,10 +139,10 @@ LRUListEntry *ObjectStatMap::EnvictLast() {
if (obj_stat.ref_count_ > 0) {
UnrecoverableError(fmt::format("EnvictLast object {} ref count is {}", lru_iter->key_, obj_stat.ref_count_));
}
if (!obj_stat.cached_) {
auto expect = ObjCached::kCached;
if (not obj_stat.cached_.compare_exchange_strong(expect, ObjCached::kNotCached)) {
UnrecoverableError(fmt::format("EnvictLast object {} is already cleaned", lru_iter->key_));
}
obj_stat.cached_ = false;
cleanuped_list_.splice(cleanuped_list_.begin(), lru_list_, lru_iter);
return &(*lru_iter);
}
Expand Down Expand Up @@ -192,12 +197,12 @@ void ObjectStatAccessor_LocalStorage::PutNew(const String &key, ObjStat obj_stat
if (map_iter != obj_map_.end()) {
UnrecoverableError(fmt::format("PutNew object {} is already in object map", key));
}
obj_stat.cached_ = true;
obj_stat.cached_ = ObjCached::kCached;
obj_map_.emplace_hint(map_iter, key, std::move(obj_stat));
}

void ObjectStatAccessor_LocalStorage::PutNoCount(const String &key, ObjStat obj_stat) {
obj_stat.cached_ = true;
obj_stat.cached_ = ObjCached::kCached;
auto [iter, insert_ok] = obj_map_.insert_or_assign(key, std::move(obj_stat));
if (!insert_ok) {
LOG_DEBUG(fmt::format("PutNew: {} is already in object map", key));
Expand Down Expand Up @@ -243,8 +248,8 @@ void ObjectStatAccessor_LocalStorage::Deserialize(const nlohmann::json &obj) {
String obj_key = json_pair["obj_key"];
ObjStat obj_stat;
obj_stat.Deserialize(json_pair["obj_stat"]);
obj_stat.cached_ = true;
obj_map_.emplace(obj_key, obj_stat);
obj_stat.cached_ = ObjCached::kCached;
obj_map_.emplace(obj_key, std::move(obj_stat));
LOG_TRACE(fmt::format("Deserialize added object {}", obj_key));
}
}
Expand Down Expand Up @@ -331,7 +336,7 @@ void ObjectStatAccessor_ObjectStorage::Deserialize(const nlohmann::json &obj) {
String obj_key = json_pair["obj_key"];
ObjStat obj_stat;
obj_stat.Deserialize(json_pair["obj_stat"]);
obj_stat.cached_ = false;
obj_stat.cached_ = ObjCached::kNotCached;
obj_map_.PutNew(obj_key, std::move(obj_stat));
LOG_TRACE(fmt::format("Deserialize added object {}", obj_key));
}
Expand Down
36 changes: 34 additions & 2 deletions src/storage/persistence/obj_status.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,49 @@ export struct Range {
bool Intersect(const Range &rhs) const { return start_ < rhs.end_ && rhs.start_ < end_; }
};

export enum class ObjCached {
kNotCached,
kDownloading,
kCached,
};

export struct ObjStat {
SizeT obj_size_{}; // footer (if present) is excluded
SizeT parts_{}; // an object attribute
SizeT ref_count_{}; // the number of user (R and W) of some part of this object
Set<Range> deleted_ranges_{};

bool cached_ = true; // whether the object is in localdisk cache
Atomic<ObjCached> cached_ = ObjCached::kCached; // whether the object is in localdisk cache

ObjStat() = default;

ObjStat(SizeT obj_size, SizeT parts, SizeT ref_count, bool cached = true) : obj_size_(obj_size), parts_(parts), ref_count_(ref_count), cached_(cached) {}
ObjStat(SizeT obj_size, SizeT parts, SizeT ref_count, ObjCached cached = ObjCached::kCached) : obj_size_(obj_size), parts_(parts), ref_count_(ref_count), cached_(cached) {}

ObjStat(const ObjStat &other) : obj_size_(other.obj_size_), parts_(other.parts_), ref_count_(other.ref_count_), deleted_ranges_(other.deleted_ranges_), cached_(other.cached_.load()) {}

ObjStat &operator=(const ObjStat &other) {
if (this != &other) {
obj_size_ = other.obj_size_;
parts_ = other.parts_;
ref_count_ = other.ref_count_;
deleted_ranges_ = other.deleted_ranges_;
cached_.store(other.cached_.load());
}
return *this;
}

ObjStat(ObjStat &&other) : obj_size_(other.obj_size_), parts_(other.parts_), ref_count_(other.ref_count_), deleted_ranges_(std::move(other.deleted_ranges_)), cached_(other.cached_.load()) {}

ObjStat &operator=(ObjStat &&other) {
if (this != &other) {
obj_size_ = other.obj_size_;
parts_ = other.parts_;
ref_count_ = other.ref_count_;
deleted_ranges_ = std::move(other.deleted_ranges_);
cached_.store(other.cached_.load());
}
return *this;
}

nlohmann::json Serialize() const;

Expand Down
23 changes: 17 additions & 6 deletions src/storage/persistence/persist_result_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import infinity_context;
import peer_task;
import logger;
import admin_statement;
import obj_status;

namespace infinity {

Expand All @@ -51,13 +52,23 @@ void PersistResultHandler::HandleWriteResult(const PersistWriteResult &result) {
}

ObjAddr PersistResultHandler::HandleReadResult(const PersistReadResult &result) {
if (!result.cached_) {
String read_path = InfinityContext::instance().persistence_manager()->GetObjPath(result.obj_addr_.obj_key_);
VirtualStore::DownloadObject(read_path, result.obj_addr_.obj_key_);
LOG_TRACE(fmt::format("GetObjCache download object {}", read_path));
result.obj_stat_->cached_ = true;
if (result.obj_stat_ != nullptr) {
ObjCached expect = ObjCached::kNotCached;
Atomic<ObjCached> &cached = result.obj_stat_->cached_;
if (cached.compare_exchange_strong(expect, ObjCached::kDownloading)) {
String read_path = InfinityContext::instance().persistence_manager()->GetObjPath(result.obj_addr_.obj_key_);
LOG_TRACE(fmt::format("GetObjCache download object {}.", read_path));
VirtualStore::DownloadObject(read_path, result.obj_addr_.obj_key_);
LOG_TRACE(fmt::format("GetObjCache download object {} done.", read_path));
cached.store(ObjCached::kCached);
cached.notify_all();
} else if (expect == ObjCached::kDownloading) {
LOG_TRACE(fmt::format("GetObjCache waiting downloading object {}", result.obj_addr_.obj_key_));
cached.wait(ObjCached::kDownloading);
LOG_TRACE(fmt::format("GetObjCache finish waiting object {}", result.obj_addr_.obj_key_));
}
}
return result.obj_addr_;
}

}
} // namespace infinity
11 changes: 3 additions & 8 deletions src/storage/persistence/persistence_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) {
if (it == local_path_obj_.end()) {
String error_message = fmt::format("GetObjCache Failed to find object for local path {}", local_path);
LOG_WARN(error_message);
result.cached_ = true;
return result;
}
result.obj_addr_ = it->second;
Expand All @@ -262,16 +261,13 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) {
String error_message = fmt::format("GetObjCache object {} is empty", it->second.obj_key_);
UnrecoverableError(error_message);
}
result.cached_ = true;
} else if (ObjStat *obj_stat = objects_->Get(it->second.obj_key_); obj_stat != nullptr) {
LOG_TRACE(fmt::format("GetObjCache object {}, file_path: {}, ref count {}", it->second.obj_key_, file_path, obj_stat->ref_count_));
String read_path = GetObjPath(result.obj_addr_.obj_key_);
if (!VirtualStore::Exists(read_path)) {
obj_stat->cached_ = false;
result.cached_ = false;
auto expect = ObjCached::kCached;
obj_stat->cached_.compare_exchange_strong(expect, ObjCached::kNotCached);
result.obj_stat_ = obj_stat;
} else {
result.cached_ = true;
}
} else {
if (it->second.obj_key_ != current_object_key_) {
Expand All @@ -280,7 +276,6 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) {
}
current_object_ref_count_++;
LOG_TRACE(fmt::format("GetObjCache current object {} ref count {}", it->second.obj_key_, current_object_ref_count_));
result.cached_ = true;
}
return result;
}
Expand Down Expand Up @@ -634,7 +629,7 @@ void AddrSerializer::InitializeValid(PersistenceManager *persistence_manager) {
UnrecoverableError(fmt::format("Invalid object address for path {}", paths_[i]));
} else {
ObjStat obj_stat = persistence_manager->GetObjStatByObjAddr(obj_addr);
obj_stats_[i] = obj_stat;
obj_stats_[i] = std::move(obj_stat);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/storage/persistence/persistence_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ export struct PersistWriteResult {

export struct PersistReadResult {
ObjAddr obj_addr_; // where data should read from
bool cached_; // whether the object is in localdisk cache
Vector<String> drop_keys_; // object that should be removed from local disk. because of 1. disk used over limit
Vector<String> drop_from_remote_keys_; // object that should be removed from remote storage. because of object's all parts are deleted
ObjStat *obj_stat_{nullptr}; // object stat
Expand Down
2 changes: 1 addition & 1 deletion src/unit_test/storage/persistence/obj_stat_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ TEST_F(ObjectStatMapTest, test1) {

ObjStat *stat2 = obj_map.GetNoCount("key2");
EXPECT_NE(stat2, nullptr);
EXPECT_FALSE(stat2->cached_);
EXPECT_EQ(stat2->cached_, ObjCached::kNotCached);
Optional<ObjStat> stat2_opt = obj_map.Invalidate("key2");
EXPECT_TRUE(stat2_opt.has_value());
stat2 = obj_map.GetNoCount("key2");
Expand Down