Skip to content

Commit

Permalink
Refactor code (#1936)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

- Version info is printed after welcome message.
- Wrong config file path will not use default config, an error is
issued.

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Sep 29, 2024
1 parent 6386249 commit 377ddcd
Show file tree
Hide file tree
Showing 21 changed files with 114 additions and 48 deletions.
5 changes: 2 additions & 3 deletions benchmark/local_infinity/fulltext/fulltext_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import stl;
import third_party;
import compilation_config;
import local_file_system;
import profiler;
import infinity;

Expand All @@ -45,6 +44,7 @@ import match_expr;
import function_expr;
import search_expr;
import column_expr;
import virtual_storage;

using namespace infinity;

Expand Down Expand Up @@ -123,8 +123,7 @@ void BenchmarkImport(SharedPtr<Infinity> infinity,
const String &db_name,
const String &table_name,
const String &import_from) {
LocalFileSystem fs;
if (!fs.Exists(import_from)) {
if (!VirtualStorage::ExistsLocal(import_from)) {
LOG_ERROR(fmt::format("Data file doesn't exist: {}", import_from));
return;
}
Expand Down
7 changes: 3 additions & 4 deletions benchmark/local_infinity/infinity_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import stl;
import infinity;

import profiler;
import local_file_system;
import third_party;

import query_options;
Expand All @@ -43,6 +42,7 @@ import knn_expr;
import column_def;
import statement_common;
import data_type;
import virtual_storage;

using namespace infinity;

Expand Down Expand Up @@ -84,8 +84,7 @@ int main() {

String path = "/var/infinity";

LocalFileSystem fs;
fs.CleanupDirectory(path);
VirtualStorage::CleanupDirectoryLocal(path);

Infinity::LocalInit(path);

Expand Down Expand Up @@ -374,7 +373,7 @@ int main() {
auto r2 = infinity->CreateTable(db_name, table_name, std::move(column_defs), std::vector<TableConstraint *>{}, std::move(create_tb_options));

std::string sift_base_path = std::string(test_data_path()) + "/benchmark/sift/base.fvecs";
if (!fs.Exists(sift_base_path)) {
if (!VirtualStorage::ExistsLocal(sift_base_path)) {
std::cout << "File: " << sift_base_path << " doesn't exist" << std::endl;
break;
}
Expand Down
9 changes: 4 additions & 5 deletions benchmark/local_infinity/knn/knn_import_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import infinity;
import logical_type;

import profiler;
import local_file_system;
import third_party;
import logical_node_type;
import embedding_info;
Expand All @@ -38,6 +37,7 @@ import knn_expr;
import column_def;
import statement_common;
import data_type;
import virtual_storage;

using namespace infinity;

Expand All @@ -59,11 +59,10 @@ int main(int argc, char *argv[]) {
data_path = std::string(argv[3]);
}

LocalFileSystem fs;
if (fs.Exists(data_path)) {
if (VirtualStorage::ExistsLocal(data_path)) {
std::cout << "Data path: " << data_path << " is already existed." << std::endl;
} else {
fs.CreateDirectory(data_path);
VirtualStorage::MakeDirectoryLocal(data_path);
std::cout << "Data path: " << data_path << " is created." << std::endl;
}

Expand Down Expand Up @@ -114,7 +113,7 @@ int main(int argc, char *argv[]) {

// auto [ table, status2 ] = data_base->GetTable(table_name);

if (!fs.Exists(base_path)) {
if (!VirtualStorage::ExistsLocal(base_path)) {
std::cout << "File: " << base_path << " doesn't exist" << std::endl;
break;
}
Expand Down
20 changes: 11 additions & 9 deletions src/bin/infinity_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ auto main(int argc, char **argv) -> int {
"| | | |\\ | | | | | | |\\ | | | | | | | \n"
"|__| |__| \\__| |__| |__| |__| \\__| |__| |__| |__| \n");

fmt::print("Release: {}.{}.{} build on {} with {} mode from branch: {}, commit-id: {}\n",
version_major(),
version_minor(),
version_patch(),
current_system_time(),
build_type(),
git_branch_name(),
git_commit_id());

fmt::print("Currently enabled SIMD support: {}\n", fmt::join(GetSupportedSimdTypesList(), ", "));

CLI::App app{"infinity_main"};

SharedPtr<String> config_path = MakeShared<String>();
Expand All @@ -168,16 +179,7 @@ auto main(int argc, char **argv) -> int {

InfinityContext::instance().config()->PrintAll();

fmt::print("Release: {}.{}.{} build on {} with {} mode from branch: {}, commit-id: {}\n",
version_major(),
version_minor(),
version_patch(),
current_system_time(),
build_type(),
git_branch_name(),
git_commit_id());

fmt::print("Currently enabled SIMD support: {}\n", fmt::join(GetSupportedSimdTypesList(), ", "));

auto start_thrift_servers = [&]() {
u32 thrift_server_port = InfinityContext::instance().config()->ClientPort();
Expand Down
8 changes: 3 additions & 5 deletions src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,14 @@ Status Config::ParseTimeInfo(const String &time_info, i64 &time_seconds) {
return Status::OK();
}

// extern SharedPtr<spdlogger> infinity_logger;

Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig *default_config) {
toml::table config_toml{};
if (config_path.get() == nullptr || config_path->empty() || !VirtualStorage::ExistsLocal(std::filesystem::absolute(*config_path))) {
if (config_path.get() == nullptr || config_path->empty()) {
// fmt::print("No config file is given, use default configs.\n");
;
fmt::print("No config file is given, use default configs.\n");
} else {
fmt::print("Config file: {} is not existent.\n", *config_path);
fmt::print("Config file: {} is not found.\n", *config_path);
return Status::NotFound(fmt::format("Config file: {} not found", *config_path));
}

Status status;
Expand Down
1 change: 1 addition & 0 deletions src/storage/io/abstract_file_handle.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public:
virtual Status Append(const String &buffer, u64 nbytes) = 0;
virtual Tuple<SizeT, Status> Read(char *buffer, u64 nbytes) = 0;
virtual Tuple<SizeT, Status> Read(String &buffer, u64 nbytes) = 0;
virtual Status Seek(u64 nbytes) = 0;
virtual Status Download(const String& url, const String& path) = 0; // Download from url to path
virtual Status Upload(const String& path, const String& url) = 0; // Upload from path to url
virtual SizeT FileSize() = 0;
Expand Down
8 changes: 8 additions & 0 deletions src/storage/io/file_handle/local_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ Tuple<SizeT, Status> LocalFile::Read(String &buffer, u64 nbytes) {
return {read_n, Status::OK()};
}

Status LocalFile::Seek(u64 nbytes) {
if ((off_t)-1 == lseek(fd_, nbytes, SEEK_SET)) {
String error_message = fmt::format("Can't seek file: {}: {}", path_, strerror(errno));
UnrecoverableError(error_message);
}
return Status::OK();
}

Status LocalFile::Download(const String &url, const String &path) { return Status::OK(); }

Status LocalFile::Upload(const String &path, const String &url) { return Status::OK(); }
Expand Down
1 change: 1 addition & 0 deletions src/storage/io/file_handle/local_file.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public:
Status Append(const String &buffer, u64 nbytes) final;
Tuple<SizeT, Status> Read(char *buffer, u64 nbytes) final;
Tuple<SizeT, Status> Read(String &buffer, u64 nbytes) final;
Status Seek(u64 nbytes) final;
Status Download(const String &url, const String &path) final;
Status Upload(const String &path, const String &url) final;
SizeT FileSize() final;
Expand Down
2 changes: 2 additions & 0 deletions src/storage/io/file_handle/minio_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ Tuple<SizeT, Status> MinioFile::Read(String &buffer, u64 nbytes) {
return {readen, Status::OK()};
}

Status MinioFile::Seek(u64 nbytes) { return Status::OK(); }

Status MinioFile::Download(const String &url, const String &path) { return Status::OK(); }

Status MinioFile::Upload(const String &path, const String &url) { return Status::OK(); }
Expand Down
1 change: 1 addition & 0 deletions src/storage/io/file_handle/minio_file.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public:
Status Append(const String &buffer, u64 nbytes) final;
Tuple<SizeT, Status> Read(char *buffer, u64 nbytes) final;
Tuple<SizeT, Status> Read(String &buffer, u64 nbytes) final;
Status Seek(u64 nbytes) final;
Status Download(const String &url, const String &path) final;
Status Upload(const String &path, const String &url) final;
SizeT FileSize() final;
Expand Down
2 changes: 2 additions & 0 deletions src/storage/io/file_handle/object_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Tuple<SizeT, Status> ObjectFile::Read(char *buffer, u64 nbytes) { return {0, Sta

Tuple<SizeT, Status> ObjectFile::Read(String &buffer, u64 nbytes) { return {0, Status::OK()}; }

Status ObjectFile::Seek(u64 nbytes) { return Status::OK(); }

Status ObjectFile::Download(const String &url, const String &path) { return Status::OK(); }

Status ObjectFile::Upload(const String &path, const String &url) { return Status::OK(); }
Expand Down
1 change: 1 addition & 0 deletions src/storage/io/file_handle/object_file.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public:
Status Append(const String &buffer, u64 nbytes) override;
Tuple<SizeT, Status> Read(char *buffer, u64 nbytes) override;
Tuple<SizeT, Status> Read(String &buffer, u64 nbytes) override;
Status Seek(u64 nbytes) override;
Status Download(const String& url, const String& path) override;
Status Upload(const String& path, const String& url) override;
SizeT FileSize() override;
Expand Down
21 changes: 19 additions & 2 deletions src/storage/io/virtual_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ Status VirtualStorage::RenameLocal(const String &old_path, const String &new_pat
return Status::OK();
}

Status VirtualStorage::TruncateLocal(const String& file_name, SizeT new_length) {
Status VirtualStorage::TruncateLocal(const String &file_name, SizeT new_length) {
if (!std::filesystem::path(file_name).is_absolute()) {
String error_message = fmt::format("{} isn't absolute path.", file_name);
UnrecoverableError(error_message);
Expand All @@ -294,7 +294,7 @@ Status VirtualStorage::TruncateLocal(const String& file_name, SizeT new_length)
return Status::OK();
}

Status VirtualStorage::MergeLocal(const String& dst_path, const String& src_path) {
Status VirtualStorage::MergeLocal(const String &dst_path, const String &src_path) {
if (!std::filesystem::path(dst_path).is_absolute()) {
String error_message = fmt::format("{} isn't absolute path.", dst_path);
UnrecoverableError(error_message);
Expand Down Expand Up @@ -327,4 +327,21 @@ Status VirtualStorage::MergeLocal(const String& dst_path, const String& src_path
return Status::OK();
}

Tuple<Vector<SharedPtr<DirEntry>>, Status> VirtualStorage::ListDirectoryLocal(const String &path) {
if (!std::filesystem::path(path).is_absolute()) {
String error_message = fmt::format("{} isn't absolute path.", path);
UnrecoverableError(error_message);
}
Path dir_path(path);
if (!is_directory(dir_path)) {
String error_message = fmt::format("{} isn't a directory", path);
UnrecoverableError(error_message);
}

Vector<SharedPtr<DirEntry>> file_array;
std::ranges::for_each(std::filesystem::directory_iterator{path},
[&](const auto &dir_entry) { file_array.emplace_back(MakeShared<DirEntry>(dir_entry)); });
return {file_array, Status::OK()};
}

} // namespace infinity
1 change: 1 addition & 0 deletions src/storage/io/virtual_storage.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public:
static Status RenameLocal(const String& old_path, const String& new_path);
static Status TruncateLocal(const String& file_name, SizeT new_length);
static Status MergeLocal(const String& dst_file, const String& src_file);
static Tuple<Vector<SharedPtr<DirEntry>>, Status> ListDirectoryLocal(const String& path);
private:
StorageType storage_type_{StorageType::kLocal};
UniquePtr<LocalDiskCache> local_disk_cache_{};
Expand Down
21 changes: 14 additions & 7 deletions src/storage/wal/log_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ module;
module log_file;

import stl;
import local_file_system;
import virtual_storage;
import third_party;
import infinity_exception;
import logger;
import default_values;
import infinity_context;
import status;
import local_file_system;

namespace infinity {

Expand Down Expand Up @@ -150,11 +152,17 @@ Pair<Vector<FullCatalogFileInfo>, Vector<DeltaCatalogFileInfo>> CatalogFile::Par
}

Pair<Optional<TempWalFileInfo>, Vector<WalFileInfo>> WalFile::ParseWalFilenames(const String &wal_dir) {
LocalFileSystem fs;
if (!fs.Exists(wal_dir)) {

if (!VirtualStorage::ExistsLocal(wal_dir)) {
return {None, Vector<WalFileInfo>{}};
}
const auto &entries = fs.ListDirectory(wal_dir);

auto [entries, status] = VirtualStorage::ListDirectoryLocal(wal_dir);
if(!status.ok()) {
LOG_CRITICAL(status.message());
UnrecoverableError(status.message());
}

if (entries.empty()) {
return {None, Vector<WalFileInfo>{}};
}
Expand Down Expand Up @@ -208,14 +216,13 @@ String WalFile::TempWalFilename() { return String(WAL_FILE_TEMP_FILE); }
// * @brief Gc the old wal files.
// * Only delete the wal.log.* files. the wal.log file is current wal file.
// * Check if the wal.log.* files are too old.
// * if * is little than the max_commit_ts, we will delete it.
// * if * is less than the max_commit_ts, we will delete it.
// */
void WalFile::RecycleWalFile(TxnTimeStamp max_commit_ts, const String &wal_dir) {
auto [cur_wal_info, wal_infos] = ParseWalFilenames(wal_dir);
for (const auto &wal_info : wal_infos) {
if (wal_info.max_commit_ts_ <= max_commit_ts) {
LocalFileSystem fs;
fs.DeleteFile(wal_info.path_);
VirtualStorage::DeleteFileLocal(wal_info.path_);
LOG_INFO(fmt::format("WalManager::Checkpoint delete wal file: {}", wal_info.path_));
}
}
Expand Down
1 change: 0 additions & 1 deletion src/unit_test/storage/meta/cleanup_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import global_resource_usage;
import third_party;
import catalog;
import base_entry;
import local_file_system;
import status;
import logger;
import third_party;
Expand Down
1 change: 0 additions & 1 deletion src/unit_test/storage/meta/db_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import global_resource_usage;
import third_party;
import catalog;
import base_entry;
import local_file_system;
import status;
import logger;
import third_party;
Expand Down
23 changes: 16 additions & 7 deletions src/unit_test/storage/persistence/persistence_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import base_test;
import stl;
import persistence_manager;
import local_file_system;
import virtual_storage;
import virtual_storage_type;
import abstract_file_handle;
import file_system_type;
import third_party;
import persist_result_handler;
Expand Down Expand Up @@ -41,15 +43,22 @@ void PersistenceManagerTest::CheckObjData(const String& local_file_path, const S
SizeT obj_file_size = fs::file_size(obj_fp);
ASSERT_LE(obj_file_size, ObjSizeLimit);

LocalFileSystem local_fs;
auto [file_handler, status] = local_fs.OpenFile(obj_path, FileFlags::READ_FLAG, FileLockType::kReadLock);
ASSERT_TRUE(status.ok());
local_fs.Seek(*file_handler, obj_addr.part_offset_);
VirtualStorage virtual_storage;
Map<String, String> configs;
virtual_storage.Init(StorageType::kLocal, configs);
auto [pm_file_handle, status] = virtual_storage.BuildFileHandle();
EXPECT_TRUE(status.ok());

status = pm_file_handle->Open(obj_path, FileAccessMode::kRead);
EXPECT_TRUE(status.ok());
status = pm_file_handle->Seek(obj_addr.part_offset_);
EXPECT_TRUE(status.ok());
auto file_size = obj_addr.part_size_;
auto buffer = std::make_unique<char[]>(file_size);
local_fs.Read(*file_handler, buffer.get(), file_size);
auto [nread, read_status] = pm_file_handle->Read(buffer.get(), file_size);
EXPECT_TRUE(read_status.ok());
ASSERT_EQ(String(buffer.get(), file_size), data);
local_fs.Close(*file_handler);
pm_file_handle->Close();

pm_->PutObjCache(local_file_path);
}
Expand Down
Loading

0 comments on commit 377ddcd

Please sign in to comment.