Skip to content

Commit

Permalink
Refactor writer->admin (#2253)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] Refactoring

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Nov 17, 2024
1 parent bc05b3f commit 00c90d6
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 3 deletions.
75 changes: 73 additions & 2 deletions src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,6 @@ Status Storage::AdminToWriter() {
}

// Used for follower and learner
Status Storage::InitToReader() { return Status::OK(); }

Status Storage::UnInitFromReader() {
LOG_INFO(fmt::format("Start to change storage from readable mode to un-init"));
{
Expand Down Expand Up @@ -427,6 +425,64 @@ Status Storage::ReaderToWriter() {
return Status::OK();
}

Status Storage::WriterToAdmin() {
if (periodic_trigger_thread_ != nullptr) {
periodic_trigger_thread_->Stop();
periodic_trigger_thread_.reset();
}

if (compact_processor_ != nullptr) {
compact_processor_->Stop(); // Different from Readable
compact_processor_.reset(); // Different from Readable
}

if (bg_processor_ != nullptr) {
bg_processor_->Stop();
bg_processor_.reset();
}

new_catalog_.reset();

memory_index_tracer_.reset();

if (wal_mgr_ != nullptr) {
wal_mgr_->Stop();
wal_mgr_.reset();
}

if (txn_mgr_ != nullptr) {
txn_mgr_->Stop();
txn_mgr_.reset();
}

if (buffer_mgr_ != nullptr) {
buffer_mgr_->Stop();
buffer_mgr_.reset();
}

if (result_cache_manager_ != nullptr) {
result_cache_manager_.reset();
}

if (cleanup_info_tracer_ != nullptr) {
cleanup_info_tracer_.reset();
}

// wal_manager stop won't reset many member. We need to recreate the wal_manager object.
wal_mgr_ = MakeUnique<WalManager>(this,
config_ptr_->WALDir(),
config_ptr_->DataDir(),
config_ptr_->WALCompactThreshold(),
config_ptr_->DeltaCheckpointThreshold(),
config_ptr_->FlushMethodAtCommit());

std::unique_lock<std::mutex> lock(mutex_);
current_storage_mode_ = StorageMode::kAdmin;
return Status::OK();
}
Status WriterToReader() { return Status::OK(); }
Status UnInitFromWriter() { return Status::OK(); }

ResultCacheManager *Storage::result_cache_manager() const noexcept {
if (config_ptr_->ResultCache() != "on") {
return nullptr;
Expand Down Expand Up @@ -503,6 +559,21 @@ Status Storage::SetStorageMode(StorageMode target_mode) {
break;
}
case StorageMode::kWritable: {
switch (target_mode) {
case StorageMode::kUnInitialized: {
break;
}
case StorageMode::kAdmin: {
return WriterToAdmin();
}
case StorageMode::kReadable: {
break;
}
case StorageMode::kWritable: {
UnrecoverableError("Attempt to set storage mode from Writable to Writable");
}
}

if (target_mode == StorageMode::kWritable) {
UnrecoverableError("Attempt to set storage mode from Writable to Writable");
}
Expand Down
6 changes: 5 additions & 1 deletion src/storage/storage.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,15 @@ public:
Status AdminToWriter();

// Used for follower and learner
Status InitToReader();
Status ReaderToAdmin();
Status ReaderToWriter();
Status UnInitFromReader();

// Used for leader and standalone
Status WriterToAdmin();
Status WriterToReader();
Status UnInitFromWriter();

void AttachCatalog(const FullCatalogFileInfo &full_ckp_info, const Vector<DeltaCatalogFileInfo> &delta_ckp_infos);
void LoadFullCheckpoint(const String &checkpoint_path);
void AttachDeltaCheckpoint(const String &checkpoint_path);
Expand Down

0 comments on commit 00c90d6

Please sign in to comment.