Skip to content

Commit

Permalink
[Feature] Support restoring from a cluster snapshot for shared-data m…
Browse files Browse the repository at this point in the history
…ode (part 4, use gtid to handle dirty tablet metadata)

Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg committed Jan 1, 2025
1 parent a105bac commit 1a590b0
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 91 deletions.
4 changes: 2 additions & 2 deletions be/src/storage/lake/lake_delvec_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ Status LakeDelvecLoader::load(const TabletSegmentId& tsid, int64_t version, DelV
Status LakeDelvecLoader::load_from_file(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec) {
(*pdelvec).reset(new DelVector());
// 2. find in delvec file
std::string filepath = _tablet_manager->tablet_metadata_location(tsid.tablet_id, version);
ASSIGN_OR_RETURN(auto metadata, _tablet_manager->get_tablet_metadata(_lake_io_opts.fs, filepath, _fill_cache));
ASSIGN_OR_RETURN(auto metadata,
_tablet_manager->get_tablet_metadata(tsid.tablet_id, version, _fill_cache, 0, _lake_io_opts.fs));
RETURN_IF_ERROR(
lake::get_del_vec(_tablet_manager, *metadata, tsid.segment_id, _fill_cache, _lake_io_opts, pdelvec->get()));
return Status::OK();
Expand Down
13 changes: 10 additions & 3 deletions be/src/storage/lake/metadata_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ namespace starrocks::lake {
template <>
StatusOr<TabletMetadataPtr> MetadataIterator<TabletMetadataPtr>::get_metadata_from_tablet_manager(
const std::string& path) {
auto tablet_metadata = _manager->get_tablet_metadata(path, false);
if (!tablet_metadata.ok() || tablet_metadata.value()->id() == _tablet_id) {
ASSIGN_OR_RETURN(auto tablet_metadata, _manager->get_tablet_metadata(path, false));

if (tablet_metadata->gtid() < _max_gtid) {
return Status::NotFound("no more element");
}

_max_gtid = tablet_metadata->gtid();

if (tablet_metadata->id() == _tablet_id) {
return tablet_metadata;
}

// Handle tablet initial metadata
auto metadata = std::make_shared<TabletMetadata>(*tablet_metadata.value());
auto metadata = std::make_shared<TabletMetadata>(*tablet_metadata);
metadata->set_id(_tablet_id);
return metadata;
}
Expand Down
10 changes: 6 additions & 4 deletions be/src/storage/lake/metadata_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <fmt/format.h>

#include <vector>
#include <set>

#include "common/status.h"
#include "storage/lake/tablet_manager.h"
Expand All @@ -28,7 +28,7 @@ class TabletManager;
template <typename T>
class MetadataIterator {
public:
explicit MetadataIterator(TabletManager* manager, int64_t tablet_id, std::vector<std::string> files)
explicit MetadataIterator(TabletManager* manager, int64_t tablet_id, std::set<std::string> files)
: _manager(manager), _tablet_id(tablet_id), _files(std::move(files)), _iter(_files.begin()){};

bool has_next() const { return _iter != _files.end(); }
Expand All @@ -46,8 +46,10 @@ class MetadataIterator {

TabletManager* _manager;
int64_t _tablet_id;
std::vector<std::string> _files;
std::vector<std::string>::iterator _iter;
// Use sorted set
std::set<std::string> _files;
std::set<std::string>::iterator _iter;
int64_t _max_gtid = 0;
};

} // namespace starrocks::lake
4 changes: 0 additions & 4 deletions be/src/storage/lake/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ Status Tablet::delete_metadata(int64_t version) {
return _mgr->delete_tablet_metadata(_id, version);
}

Status Tablet::metadata_exists(int64_t version) {
return _mgr->tablet_metadata_exists(_id, version);
}

Status Tablet::put_txn_log(const TxnLog& log) {
return _mgr->put_txn_log(log);
}
Expand Down
94 changes: 42 additions & 52 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ std::string TabletManager::tablet_latest_metadata_cache_key(int64_t tablet_id) {
return fmt::format("TL{}", tablet_id);
}

Status TabletManager::drop_local_cache(const std::string& path) {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(path));
return fs->drop_local_cache(path);
}

// current lru cache does not support updating value size, so use refill to update.
void TabletManager::update_segment_cache_size(std::string_view key, intptr_t segment_addr_hint) {
// use write lock to protect parallel segment size update
Expand Down Expand Up @@ -239,70 +244,75 @@ Status TabletManager::put_tablet_metadata(const TabletMetadata& metadata) {
return put_tablet_metadata(std::move(metadata_ptr));
}

StatusOr<TabletMetadataPtr> TabletManager::load_tablet_metadata(std::shared_ptr<FileSystem> fs,
const string& metadata_location, bool fill_cache) {
StatusOr<TabletMetadataPtr> TabletManager::load_tablet_metadata(const string& metadata_location, bool fill_cache,
int64_t expected_gtid,
const std::shared_ptr<FileSystem>& fs) {
TEST_ERROR_POINT("TabletManager::load_tablet_metadata");
auto t0 = butil::gettimeofday_us();
auto metadata = std::make_shared<TabletMetadataPB>();
ProtobufFile file(metadata_location, std::move(fs));
ProtobufFile file(metadata_location, fs);
auto s = file.load(metadata.get(), fill_cache);
if (!s.ok()) {
if (s.is_corruption() && config::lake_clear_corrupted_cache) {
auto tmp_fs = FileSystem::CreateSharedFromString(metadata_location);
if (!tmp_fs.ok()) {
LOG(WARNING) << "fail to get file system to clear corrupted cache for " << metadata_location
<< ", error: " << tmp_fs.status();
return s;
}
auto drop_status = (*tmp_fs)->drop_local_cache(metadata_location);
if (drop_status.ok()) {
LOG(INFO) << "clear corrupted cache for " << metadata_location;
// reset metadata
metadata = std::make_shared<TabletMetadataPB>();
// read again
RETURN_IF_ERROR(file.load(metadata.get(), fill_cache));
} else {
auto drop_status = drop_local_cache(metadata_location);
if (!drop_status.ok()) {
LOG(WARNING) << "clear corrupted cache for " << metadata_location << " failed, "
<< "error: " << drop_status;
return s; // return error so load tablet meta can be retried
}
LOG(INFO) << "clear corrupted cache for " << metadata_location;
// reset metadata
metadata = std::make_shared<TabletMetadataPB>();
// read again
RETURN_IF_ERROR(file.load(metadata.get(), fill_cache));
} else {
return s;
}
}

if (expected_gtid > 0 && metadata->gtid() > 0 && expected_gtid != metadata->gtid()) {
auto drop_status = drop_local_cache(metadata_location);
if (!drop_status.ok()) {
LOG(WARNING) << "clear dirty cache for " << metadata_location << " failed, "
<< "error: " << drop_status;
return drop_status;
}
LOG(INFO) << "clear dirty cache for " << metadata_location;
return Status::NotFound("Not found expected tablet metadata");
}

g_get_tablet_metadata_latency << (butil::gettimeofday_us() - t0);
return std::move(metadata);
return metadata;
}

TabletMetadataPtr TabletManager::get_latest_cached_tablet_metadata(int64_t tablet_id) {
return _metacache->lookup_tablet_metadata(tablet_latest_metadata_cache_key(tablet_id));
}

StatusOr<TabletMetadataPtr> TabletManager::get_tablet_metadata(int64_t tablet_id, int64_t version, bool fill_cache) {
StatusOr<TabletMetadataPtr> TabletManager::get_tablet_metadata(int64_t tablet_id, int64_t version, bool fill_cache,
int64_t expected_gtid,
const std::shared_ptr<FileSystem>& fs) {
if (version <= kInitialVersion) {
// Handle tablet initial metadata
auto initial_metadata = get_tablet_metadata(tablet_initial_metadata_location(tablet_id), fill_cache);
auto initial_metadata =
get_tablet_metadata(tablet_initial_metadata_location(tablet_id), fill_cache, expected_gtid, fs);
if (initial_metadata.ok()) {
auto tablet_metadata = std::make_shared<TabletMetadata>(*initial_metadata.value());
tablet_metadata->set_id(tablet_id);
return tablet_metadata;
}
}
return get_tablet_metadata(tablet_metadata_location(tablet_id, version), fill_cache);
return get_tablet_metadata(tablet_metadata_location(tablet_id, version), fill_cache, expected_gtid, fs);
}

StatusOr<TabletMetadataPtr> TabletManager::get_tablet_metadata(const string& path, bool fill_cache) {
std::shared_ptr<FileSystem> fs;
return get_tablet_metadata(fs, path, fill_cache);
}

StatusOr<TabletMetadataPtr> TabletManager::get_tablet_metadata(std::shared_ptr<FileSystem> fs, const string& path,
bool fill_cache) {
StatusOr<TabletMetadataPtr> TabletManager::get_tablet_metadata(const string& path, bool fill_cache,
int64_t expected_gtid,
const std::shared_ptr<FileSystem>& fs) {
if (auto ptr = _metacache->lookup_tablet_metadata(path); ptr != nullptr) {
TRACE("got cached tablet metadata");
return ptr;
}
ASSIGN_OR_RETURN(auto ptr, load_tablet_metadata(std::move(fs), path, fill_cache));
ASSIGN_OR_RETURN(auto ptr, load_tablet_metadata(path, fill_cache, expected_gtid, fs));
if (fill_cache) {
_metacache->cache_tablet_metadata(path, ptr);
}
Expand All @@ -319,36 +329,16 @@ Status TabletManager::delete_tablet_metadata(int64_t tablet_id, int64_t version)
return fs::delete_file(location);
}

Status TabletManager::tablet_metadata_exists(int64_t tablet_id, int64_t version) {
if (version <= kInitialVersion) {
// Handle tablet initial metadata
auto status = tablet_metadata_exists(tablet_initial_metadata_location(tablet_id));
if (status.ok()) {
return status;
}
}
return tablet_metadata_exists(tablet_metadata_location(tablet_id, version));
}

Status TabletManager::tablet_metadata_exists(const std::string& path) {
if (auto ptr = _metacache->lookup_tablet_metadata(path); ptr != nullptr) {
TRACE("got cached tablet metadata");
return Status::OK();
}
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(path));
return fs->path_exists(path);
}

StatusOr<TabletMetadataIter> TabletManager::list_tablet_metadata(int64_t tablet_id) {
std::vector<std::string> objects{};
std::set<std::string> objects;
// TODO: construct prefix in LocationProvider
std::string prefix = fmt::format("{:016X}_", tablet_id);

auto root = _location_provider->metadata_root_location(tablet_id);
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(root));
auto scan_cb = [&](std::string_view name) {
if (HasPrefixString(name, prefix)) {
objects.emplace_back(join_path(root, name));
objects.insert(join_path(root, name));
}
return true;
};
Expand All @@ -357,7 +347,7 @@ StatusOr<TabletMetadataIter> TabletManager::list_tablet_metadata(int64_t tablet_

if (objects.empty()) {
// Put tablet initial metadata
objects.emplace_back(join_path(root, tablet_initial_metadata_filename()));
objects.insert(join_path(root, tablet_initial_metadata_filename()));
}

return TabletMetadataIter{this, tablet_id, std::move(objects)};
Expand Down
26 changes: 13 additions & 13 deletions be/src/storage/lake/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,23 @@ class TabletManager {

Status put_tablet_metadata(const TabletMetadataPtr& metadata);

StatusOr<TabletMetadataPtr> get_tablet_metadata(int64_t tablet_id, int64_t version, bool fill_cache = true);
// When using get_tablet_metadata to determine whether a new version exists in publish version,
// a valid expected_gtid must be passed in.
StatusOr<TabletMetadataPtr> get_tablet_metadata(int64_t tablet_id, int64_t version, bool fill_cache = true,
int64_t expected_gtid = 0,
const std::shared_ptr<FileSystem>& fs = nullptr);

StatusOr<TabletMetadataPtr> get_tablet_metadata(const std::string& path, bool fill_cache = true);
StatusOr<TabletMetadataPtr> get_tablet_metadata(std::shared_ptr<FileSystem> fs, const std::string& path,
bool fill_cache = true);
// Do not use this function except in a list dir
StatusOr<TabletMetadataPtr> get_tablet_metadata(const std::string& path, bool fill_cache = true,
int64_t expected_gtid = 0,
const std::shared_ptr<FileSystem>& fs = nullptr);

TabletMetadataPtr get_latest_cached_tablet_metadata(int64_t tablet_id);

StatusOr<TabletMetadataIter> list_tablet_metadata(int64_t tablet_id);

Status delete_tablet_metadata(int64_t tablet_id, int64_t version);

// Use this function instead of get_tablet_metadata where you just need to check if tablet metadata exists
Status tablet_metadata_exists(int64_t tablet_id, int64_t version);

// Do not use this function except in a list dir
Status tablet_metadata_exists(const std::string& path);

Status put_txn_log(const TxnLog& log);

Status put_txn_log(const TxnLogPtr& log);
Expand Down Expand Up @@ -217,17 +216,18 @@ class TabletManager {
static std::string global_schema_cache_key(int64_t index_id);
static std::string tablet_schema_cache_key(int64_t tablet_id);
static std::string tablet_latest_metadata_cache_key(int64_t tablet_id);
static Status drop_local_cache(const std::string& path);

StatusOr<TabletSchemaPtr> load_and_parse_schema_file(const std::string& path);
StatusOr<TabletSchemaPtr> get_tablet_schema_by_id(int64_t tablet_id, int64_t schema_id);

StatusOr<TabletMetadataPtr> load_tablet_metadata(std::shared_ptr<FileSystem> fs,
const std::string& metadata_location, bool fill_cache);
Status put_tablet_metadata(const TabletMetadataPtr& metadata, const std::string& metadata_location);
StatusOr<TabletMetadataPtr> load_tablet_metadata(const std::string& metadata_location, bool fill_cache);
StatusOr<TabletMetadataPtr> load_tablet_metadata(const std::string& metadata_location, bool fill_cache,
int64_t expected_gtid, const std::shared_ptr<FileSystem>& fs);
StatusOr<TxnLogPtr> load_txn_log(const std::string& txn_log_location, bool fill_cache);
StatusOr<CombinedTxnLogPtr> load_combined_txn_log(const std::string& path, bool fill_cache);

private:
std::shared_ptr<LocationProvider> _location_provider;
std::unique_ptr<Metacache> _metacache;
std::unique_ptr<CompactionScheduler> _compaction_scheduler;
Expand Down
19 changes: 11 additions & 8 deletions be/src/storage/lake/transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ static void clear_remote_snapshot_async(TabletManager* tablet_mgr, int64_t table
files_to_delete->emplace_back(std::move(slog_path));
}

int64_t cal_new_base_version(int64_t tablet_id, TabletManager* tablet_mgr, int64_t base_version, int64_t new_version) {
int64_t cal_new_base_version(int64_t tablet_id, TabletManager* tablet_mgr, int64_t base_version, int64_t new_version,
const std::span<const TxnInfoPB>& txns) {
int64_t version = base_version;
auto metadata = tablet_mgr->get_latest_cached_tablet_metadata(tablet_id);
if (metadata != nullptr && metadata->version() <= new_version) {
Expand All @@ -92,7 +93,8 @@ int64_t cal_new_base_version(int64_t tablet_id, TabletManager* tablet_mgr, int64
if (index_version > version) {
// There is a possibility that the index version is newer than the version in remote storage.
// Check whether the index version exists in remote storage. If not, clear and rebuild the index.
auto res = tablet_mgr->tablet_metadata_exists(tablet_id, index_version);
auto res = tablet_mgr->get_tablet_metadata(tablet_id, index_version, true,
txns[index_version - base_version - 1].gtid());
if (res.ok()) {
version = index_version;
} else {
Expand Down Expand Up @@ -182,14 +184,14 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
return std::move(cached_new_metadata);
}

auto new_version_metadata_or_error = [=](Status error) -> StatusOr<TabletMetadataPtr> {
auto res = tablet_mgr->get_tablet_metadata(tablet_id, new_version);
auto new_version_metadata_or_error = [=](const Status& error) -> StatusOr<TabletMetadataPtr> {
auto res = tablet_mgr->get_tablet_metadata(tablet_id, new_version, txns.back().gtid());
if (res.ok()) return res;
return error;
};

int64_t ori_base_version = base_version;
int64_t new_base_version = cal_new_base_version(tablet_id, tablet_mgr, base_version, new_version);
int64_t new_base_version = cal_new_base_version(tablet_id, tablet_mgr, base_version, new_version, txns);
if (new_base_version > base_version) {
LOG(INFO) << "Base version has been adjusted. tablet_id=" << tablet_id << " base_version=" << base_version
<< " new_base_version=" << new_base_version << " new_version=" << new_version << " txns=" << txns;
Expand All @@ -203,7 +205,7 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
}

if (base_version == new_version) {
return tablet_mgr->get_tablet_metadata(tablet_id, base_version);
return tablet_mgr->get_tablet_metadata(tablet_id, new_version, true, txns.back().gtid());
}

// Read base version metadata
Expand Down Expand Up @@ -249,7 +251,7 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
// needs take compaction(force_publish=true) into consideration
// 1. duplicate publish in mode single
if (txns.size() == 1) {
auto res = tablet_mgr->get_tablet_metadata(tablet_id, new_version);
auto res = tablet_mgr->get_tablet_metadata(tablet_id, new_version, true, txns.back().gtid());
if (!res.ok() && res.status().is_not_found()) {
if (!txns[i].force_publish()) {
return txn_log_st.status();
Expand All @@ -270,7 +272,8 @@ StatusOr<TabletMetadataPtr> publish_version(TabletManager* tablet_mgr, int64_t t
// then txn3 is published successfully in BE and the txn_log of txn3 has been deleted, but FE do not get the response for some reason,
// turn the mode of publish to batch,
// txn3 ,txn4, txn5 will be published in one publish batch task, so txn3 should be skipped just apply txn_log of txn4 and txn5.
auto missig_txn_log_meta = tablet_mgr->get_tablet_metadata(tablet_id, base_version + 1);
auto missig_txn_log_meta =
tablet_mgr->get_tablet_metadata(tablet_id, base_version + 1, true, txns[0].gtid());
if (missig_txn_log_meta.status().is_not_found()) {
if (txns[i].force_publish()) {
// can not change `base_metadata` below, just use old one
Expand Down
3 changes: 1 addition & 2 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,9 @@ Status UpdateManager::get_del_vec(const TabletSegmentId& tsid, int64_t version,
// get delvec in meta file
Status UpdateManager::get_del_vec_in_meta(const TabletSegmentId& tsid, int64_t meta_ver, bool fill_cache,
DelVector* delvec) {
std::string filepath = _tablet_mgr->tablet_metadata_location(tsid.tablet_id, meta_ver);
LakeIOOptions lake_io_opts;
lake_io_opts.fill_data_cache = fill_cache;
ASSIGN_OR_RETURN(auto metadata, _tablet_mgr->get_tablet_metadata(filepath, fill_cache));
ASSIGN_OR_RETURN(auto metadata, _tablet_mgr->get_tablet_metadata(tsid.tablet_id, meta_ver, fill_cache));
RETURN_IF_ERROR(lake::get_del_vec(_tablet_mgr, *metadata, tsid.segment_id, fill_cache, lake_io_opts, delvec));
return Status::OK();
}
Expand Down
Loading

0 comments on commit 1a590b0

Please sign in to comment.