diff --git a/benchmark/local_infinity/fulltext/fulltext_benchmark.cpp b/benchmark/local_infinity/fulltext/fulltext_benchmark.cpp index 4353c962ba..c0b51d8a63 100644 --- a/benchmark/local_infinity/fulltext/fulltext_benchmark.cpp +++ b/benchmark/local_infinity/fulltext/fulltext_benchmark.cpp @@ -26,7 +26,6 @@ import stl; import third_party; import compilation_config; -import local_file_system; import profiler; import infinity; @@ -45,6 +44,7 @@ import match_expr; import function_expr; import search_expr; import column_expr; +import virtual_storage; using namespace infinity; @@ -123,8 +123,7 @@ void BenchmarkImport(SharedPtr infinity, const String &db_name, const String &table_name, const String &import_from) { - LocalFileSystem fs; - if (!fs.Exists(import_from)) { + if (!VirtualStorage::ExistsLocal(import_from)) { LOG_ERROR(fmt::format("Data file doesn't exist: {}", import_from)); return; } diff --git a/benchmark/local_infinity/infinity_benchmark.cpp b/benchmark/local_infinity/infinity_benchmark.cpp index 662f9a0ec0..aba397bf41 100644 --- a/benchmark/local_infinity/infinity_benchmark.cpp +++ b/benchmark/local_infinity/infinity_benchmark.cpp @@ -25,7 +25,6 @@ import stl; import infinity; import profiler; -import local_file_system; import third_party; import query_options; @@ -43,6 +42,7 @@ import knn_expr; import column_def; import statement_common; import data_type; +import virtual_storage; using namespace infinity; @@ -84,8 +84,7 @@ int main() { String path = "/var/infinity"; - LocalFileSystem fs; - fs.CleanupDirectory(path); + VirtualStorage::CleanupDirectoryLocal(path); Infinity::LocalInit(path); @@ -374,7 +373,7 @@ int main() { auto r2 = infinity->CreateTable(db_name, table_name, std::move(column_defs), std::vector{}, std::move(create_tb_options)); std::string sift_base_path = std::string(test_data_path()) + "/benchmark/sift/base.fvecs"; - if (!fs.Exists(sift_base_path)) { + if (!VirtualStorage::ExistsLocal(sift_base_path)) { std::cout << "File: " << sift_base_path << " doesn't exist" << std::endl; break; } diff --git a/benchmark/local_infinity/knn/knn_import_benchmark.cpp b/benchmark/local_infinity/knn/knn_import_benchmark.cpp index 66c0817b00..f33a6e8a70 100644 --- a/benchmark/local_infinity/knn/knn_import_benchmark.cpp +++ b/benchmark/local_infinity/knn/knn_import_benchmark.cpp @@ -26,7 +26,6 @@ import infinity; import logical_type; import profiler; -import local_file_system; import third_party; import logical_node_type; import embedding_info; @@ -38,6 +37,7 @@ import knn_expr; import column_def; import statement_common; import data_type; +import virtual_storage; using namespace infinity; @@ -59,11 +59,10 @@ int main(int argc, char *argv[]) { data_path = std::string(argv[3]); } - LocalFileSystem fs; - if (fs.Exists(data_path)) { + if (VirtualStorage::ExistsLocal(data_path)) { std::cout << "Data path: " << data_path << " is already existed." << std::endl; } else { - fs.CreateDirectory(data_path); + VirtualStorage::MakeDirectoryLocal(data_path); std::cout << "Data path: " << data_path << " is created." << std::endl; } @@ -114,7 +113,7 @@ int main(int argc, char *argv[]) { // auto [ table, status2 ] = data_base->GetTable(table_name); - if (!fs.Exists(base_path)) { + if (!VirtualStorage::ExistsLocal(base_path)) { std::cout << "File: " << base_path << " doesn't exist" << std::endl; break; } diff --git a/src/bin/infinity_main.cpp b/src/bin/infinity_main.cpp index 3fe663e08d..eb0dc77e5d 100644 --- a/src/bin/infinity_main.cpp +++ b/src/bin/infinity_main.cpp @@ -152,6 +152,17 @@ auto main(int argc, char **argv) -> int { "| | | |\\ | | | | | | |\\ | | | | | | | \n" "|__| |__| \\__| |__| |__| |__| \\__| |__| |__| |__| \n"); + fmt::print("Release: {}.{}.{} build on {} with {} mode from branch: {}, commit-id: {}\n", + version_major(), + version_minor(), + version_patch(), + current_system_time(), + build_type(), + git_branch_name(), + git_commit_id()); + + fmt::print("Currently enabled SIMD support: {}\n", fmt::join(GetSupportedSimdTypesList(), ", ")); + CLI::App app{"infinity_main"}; SharedPtr config_path = MakeShared(); @@ -168,16 +179,7 @@ auto main(int argc, char **argv) -> int { InfinityContext::instance().config()->PrintAll(); - fmt::print("Release: {}.{}.{} build on {} with {} mode from branch: {}, commit-id: {}\n", - version_major(), - version_minor(), - version_patch(), - current_system_time(), - build_type(), - git_branch_name(), - git_commit_id()); - fmt::print("Currently enabled SIMD support: {}\n", fmt::join(GetSupportedSimdTypesList(), ", ")); auto start_thrift_servers = [&]() { u32 thrift_server_port = InfinityContext::instance().config()->ClientPort(); diff --git a/src/main/config.cpp b/src/main/config.cpp index 881df92e66..3abcab418c 100644 --- a/src/main/config.cpp +++ b/src/main/config.cpp @@ -114,16 +114,14 @@ Status Config::ParseTimeInfo(const String &time_info, i64 &time_seconds) { return Status::OK(); } -// extern SharedPtr infinity_logger; - Status Config::Init(const SharedPtr &config_path, DefaultConfig *default_config) { toml::table config_toml{}; if (config_path.get() == nullptr || config_path->empty() || !VirtualStorage::ExistsLocal(std::filesystem::absolute(*config_path))) { if (config_path.get() == nullptr || config_path->empty()) { -// fmt::print("No config file is given, use default configs.\n"); - ; + fmt::print("No config file is given, use default configs.\n"); } else { - fmt::print("Config file: {} is not existent.\n", *config_path); + fmt::print("Config file: {} is not found.\n", *config_path); + return Status::NotFound(fmt::format("Config file: {} not found", *config_path)); } Status status; diff --git a/src/storage/io/abstract_file_handle.cppm b/src/storage/io/abstract_file_handle.cppm index 04632a60c5..07faa5764a 100644 --- a/src/storage/io/abstract_file_handle.cppm +++ b/src/storage/io/abstract_file_handle.cppm @@ -36,6 +36,7 @@ public: virtual Status Append(const String &buffer, u64 nbytes) = 0; virtual Tuple Read(char *buffer, u64 nbytes) = 0; virtual Tuple Read(String &buffer, u64 nbytes) = 0; + virtual Status Seek(u64 nbytes) = 0; virtual Status Download(const String& url, const String& path) = 0; // Download from url to path virtual Status Upload(const String& path, const String& url) = 0; // Upload from path to url virtual SizeT FileSize() = 0; diff --git a/src/storage/io/file_handle/local_file.cpp b/src/storage/io/file_handle/local_file.cpp index ccc62a526c..038546b687 100644 --- a/src/storage/io/file_handle/local_file.cpp +++ b/src/storage/io/file_handle/local_file.cpp @@ -144,6 +144,14 @@ Tuple LocalFile::Read(String &buffer, u64 nbytes) { return {read_n, Status::OK()}; } +Status LocalFile::Seek(u64 nbytes) { + if ((off_t)-1 == lseek(fd_, nbytes, SEEK_SET)) { + String error_message = fmt::format("Can't seek file: {}: {}", path_, strerror(errno)); + UnrecoverableError(error_message); + } + return Status::OK(); +} + Status LocalFile::Download(const String &url, const String &path) { return Status::OK(); } Status LocalFile::Upload(const String &path, const String &url) { return Status::OK(); } diff --git a/src/storage/io/file_handle/local_file.cppm b/src/storage/io/file_handle/local_file.cppm index 7379b36e12..d43d080ad5 100644 --- a/src/storage/io/file_handle/local_file.cppm +++ b/src/storage/io/file_handle/local_file.cppm @@ -35,6 +35,7 @@ public: Status Append(const String &buffer, u64 nbytes) final; Tuple Read(char *buffer, u64 nbytes) final; Tuple Read(String &buffer, u64 nbytes) final; + Status Seek(u64 nbytes) final; Status Download(const String &url, const String &path) final; Status Upload(const String &path, const String &url) final; SizeT FileSize() final; diff --git a/src/storage/io/file_handle/minio_file.cpp b/src/storage/io/file_handle/minio_file.cpp index a2643fb0e4..02526527b9 100644 --- a/src/storage/io/file_handle/minio_file.cpp +++ b/src/storage/io/file_handle/minio_file.cpp @@ -202,6 +202,8 @@ Tuple MinioFile::Read(String &buffer, u64 nbytes) { return {readen, Status::OK()}; } +Status MinioFile::Seek(u64 nbytes) { return Status::OK(); } + Status MinioFile::Download(const String &url, const String &path) { return Status::OK(); } Status MinioFile::Upload(const String &path, const String &url) { return Status::OK(); } diff --git a/src/storage/io/file_handle/minio_file.cppm b/src/storage/io/file_handle/minio_file.cppm index 9aa06e5e9b..886fd02042 100644 --- a/src/storage/io/file_handle/minio_file.cppm +++ b/src/storage/io/file_handle/minio_file.cppm @@ -41,6 +41,7 @@ public: Status Append(const String &buffer, u64 nbytes) final; Tuple Read(char *buffer, u64 nbytes) final; Tuple Read(String &buffer, u64 nbytes) final; + Status Seek(u64 nbytes) final; Status Download(const String &url, const String &path) final; Status Upload(const String &path, const String &url) final; SizeT FileSize() final; diff --git a/src/storage/io/file_handle/object_file.cpp b/src/storage/io/file_handle/object_file.cpp index 00a7e5960d..7eeebf1351 100644 --- a/src/storage/io/file_handle/object_file.cpp +++ b/src/storage/io/file_handle/object_file.cpp @@ -37,6 +37,8 @@ Tuple ObjectFile::Read(char *buffer, u64 nbytes) { return {0, Sta Tuple ObjectFile::Read(String &buffer, u64 nbytes) { return {0, Status::OK()}; } +Status ObjectFile::Seek(u64 nbytes) { return Status::OK(); } + Status ObjectFile::Download(const String &url, const String &path) { return Status::OK(); } Status ObjectFile::Upload(const String &path, const String &url) { return Status::OK(); } diff --git a/src/storage/io/file_handle/object_file.cppm b/src/storage/io/file_handle/object_file.cppm index ae8cdc0626..287c360206 100644 --- a/src/storage/io/file_handle/object_file.cppm +++ b/src/storage/io/file_handle/object_file.cppm @@ -35,6 +35,7 @@ public: Status Append(const String &buffer, u64 nbytes) override; Tuple Read(char *buffer, u64 nbytes) override; Tuple Read(String &buffer, u64 nbytes) override; + Status Seek(u64 nbytes) override; Status Download(const String& url, const String& path) override; Status Upload(const String& path, const String& url) override; SizeT FileSize() override; diff --git a/src/storage/io/virtual_storage.cpp b/src/storage/io/virtual_storage.cpp index cf561e7671..6e9ebd6140 100644 --- a/src/storage/io/virtual_storage.cpp +++ b/src/storage/io/virtual_storage.cpp @@ -280,7 +280,7 @@ Status VirtualStorage::RenameLocal(const String &old_path, const String &new_pat return Status::OK(); } -Status VirtualStorage::TruncateLocal(const String& file_name, SizeT new_length) { +Status VirtualStorage::TruncateLocal(const String &file_name, SizeT new_length) { if (!std::filesystem::path(file_name).is_absolute()) { String error_message = fmt::format("{} isn't absolute path.", file_name); UnrecoverableError(error_message); @@ -294,7 +294,7 @@ Status VirtualStorage::TruncateLocal(const String& file_name, SizeT new_length) return Status::OK(); } -Status VirtualStorage::MergeLocal(const String& dst_path, const String& src_path) { +Status VirtualStorage::MergeLocal(const String &dst_path, const String &src_path) { if (!std::filesystem::path(dst_path).is_absolute()) { String error_message = fmt::format("{} isn't absolute path.", dst_path); UnrecoverableError(error_message); @@ -327,4 +327,21 @@ Status VirtualStorage::MergeLocal(const String& dst_path, const String& src_path return Status::OK(); } +Tuple>, Status> VirtualStorage::ListDirectoryLocal(const String &path) { + if (!std::filesystem::path(path).is_absolute()) { + String error_message = fmt::format("{} isn't absolute path.", path); + UnrecoverableError(error_message); + } + Path dir_path(path); + if (!is_directory(dir_path)) { + String error_message = fmt::format("{} isn't a directory", path); + UnrecoverableError(error_message); + } + + Vector> file_array; + std::ranges::for_each(std::filesystem::directory_iterator{path}, + [&](const auto &dir_entry) { file_array.emplace_back(MakeShared(dir_entry)); }); + return {file_array, Status::OK()}; +} + } // namespace infinity diff --git a/src/storage/io/virtual_storage.cppm b/src/storage/io/virtual_storage.cppm index cb552c513c..fc397957b1 100644 --- a/src/storage/io/virtual_storage.cppm +++ b/src/storage/io/virtual_storage.cppm @@ -62,6 +62,7 @@ public: static Status RenameLocal(const String& old_path, const String& new_path); static Status TruncateLocal(const String& file_name, SizeT new_length); static Status MergeLocal(const String& dst_file, const String& src_file); + static Tuple>, Status> ListDirectoryLocal(const String& path); private: StorageType storage_type_{StorageType::kLocal}; UniquePtr local_disk_cache_{}; diff --git a/src/storage/wal/log_file.cpp b/src/storage/wal/log_file.cpp index 22c08e4dc8..c6ea948016 100644 --- a/src/storage/wal/log_file.cpp +++ b/src/storage/wal/log_file.cpp @@ -19,12 +19,14 @@ module; module log_file; import stl; -import local_file_system; +import virtual_storage; import third_party; import infinity_exception; import logger; import default_values; import infinity_context; +import status; +import local_file_system; namespace infinity { @@ -150,11 +152,17 @@ Pair, Vector> CatalogFile::Par } Pair, Vector> WalFile::ParseWalFilenames(const String &wal_dir) { - LocalFileSystem fs; - if (!fs.Exists(wal_dir)) { + + if (!VirtualStorage::ExistsLocal(wal_dir)) { return {None, Vector{}}; } - const auto &entries = fs.ListDirectory(wal_dir); + + auto [entries, status] = VirtualStorage::ListDirectoryLocal(wal_dir); + if(!status.ok()) { + LOG_CRITICAL(status.message()); + UnrecoverableError(status.message()); + } + if (entries.empty()) { return {None, Vector{}}; } @@ -208,14 +216,13 @@ String WalFile::TempWalFilename() { return String(WAL_FILE_TEMP_FILE); } // * @brief Gc the old wal files. // * Only delete the wal.log.* files. the wal.log file is current wal file. // * Check if the wal.log.* files are too old. -// * if * is little than the max_commit_ts, we will delete it. +// * if * is less than the max_commit_ts, we will delete it. // */ void WalFile::RecycleWalFile(TxnTimeStamp max_commit_ts, const String &wal_dir) { auto [cur_wal_info, wal_infos] = ParseWalFilenames(wal_dir); for (const auto &wal_info : wal_infos) { if (wal_info.max_commit_ts_ <= max_commit_ts) { - LocalFileSystem fs; - fs.DeleteFile(wal_info.path_); + VirtualStorage::DeleteFileLocal(wal_info.path_); LOG_INFO(fmt::format("WalManager::Checkpoint delete wal file: {}", wal_info.path_)); } } diff --git a/src/unit_test/storage/meta/cleanup_scanner.cpp b/src/unit_test/storage/meta/cleanup_scanner.cpp index dbcef7995f..cf9429a7ae 100644 --- a/src/unit_test/storage/meta/cleanup_scanner.cpp +++ b/src/unit_test/storage/meta/cleanup_scanner.cpp @@ -10,7 +10,6 @@ import global_resource_usage; import third_party; import catalog; import base_entry; -import local_file_system; import status; import logger; import third_party; diff --git a/src/unit_test/storage/meta/db_meta.cpp b/src/unit_test/storage/meta/db_meta.cpp index a378b39048..c6478ae3fa 100644 --- a/src/unit_test/storage/meta/db_meta.cpp +++ b/src/unit_test/storage/meta/db_meta.cpp @@ -10,7 +10,6 @@ import global_resource_usage; import third_party; import catalog; import base_entry; -import local_file_system; import status; import logger; import third_party; diff --git a/src/unit_test/storage/persistence/persistence_manager.cpp b/src/unit_test/storage/persistence/persistence_manager.cpp index c23e812bde..61f0bf4ee9 100644 --- a/src/unit_test/storage/persistence/persistence_manager.cpp +++ b/src/unit_test/storage/persistence/persistence_manager.cpp @@ -2,7 +2,9 @@ import base_test; import stl; import persistence_manager; -import local_file_system; +import virtual_storage; +import virtual_storage_type; +import abstract_file_handle; import file_system_type; import third_party; import persist_result_handler; @@ -41,15 +43,22 @@ void PersistenceManagerTest::CheckObjData(const String& local_file_path, const S SizeT obj_file_size = fs::file_size(obj_fp); ASSERT_LE(obj_file_size, ObjSizeLimit); - LocalFileSystem local_fs; - auto [file_handler, status] = local_fs.OpenFile(obj_path, FileFlags::READ_FLAG, FileLockType::kReadLock); - ASSERT_TRUE(status.ok()); - local_fs.Seek(*file_handler, obj_addr.part_offset_); + VirtualStorage virtual_storage; + Map configs; + virtual_storage.Init(StorageType::kLocal, configs); + auto [pm_file_handle, status] = virtual_storage.BuildFileHandle(); + EXPECT_TRUE(status.ok()); + + status = pm_file_handle->Open(obj_path, FileAccessMode::kRead); + EXPECT_TRUE(status.ok()); + status = pm_file_handle->Seek(obj_addr.part_offset_); + EXPECT_TRUE(status.ok()); auto file_size = obj_addr.part_size_; auto buffer = std::make_unique(file_size); - local_fs.Read(*file_handler, buffer.get(), file_size); + auto [nread, read_status] = pm_file_handle->Read(buffer.get(), file_size); + EXPECT_TRUE(read_status.ok()); ASSERT_EQ(String(buffer.get(), file_size), data); - local_fs.Close(*file_handler); + pm_file_handle->Close(); pm_->PutObjCache(local_file_path); } diff --git a/src/unit_test/storage/wal/recycle_log.cpp b/src/unit_test/storage/wal/recycle_log.cpp index 520d83bbea..436564861a 100644 --- a/src/unit_test/storage/wal/recycle_log.cpp +++ b/src/unit_test/storage/wal/recycle_log.cpp @@ -27,7 +27,6 @@ import log_file; import config; import bg_task; import background_process; -import local_file_system; import default_values; import status; import logger; @@ -65,7 +64,6 @@ TEST_P(RecycleLogTest, recycle_wal_after_delta_checkpoint) { BGTaskProcessor *bg_processor = storage->bg_processor(); const String &wal_dir = config->WALDir(); - LocalFileSystem fs; { time_t start = time(nullptr); while (true) { @@ -161,7 +159,6 @@ TEST_P(RecycleLogTest, recycle_wal_after_full_checkpoint) { const String &wal_dir = config->WALDir(); const String &catalog_dir = config->DataDir() + "/" + String(CATALOG_FILE_DIR); - LocalFileSystem fs; for (int i = 0; i < 2; ++i) { // create 2 delta catalog file time_t start = time(nullptr); while (true) { diff --git a/src/unit_test/storage/wal/repeat_replay.cpp b/src/unit_test/storage/wal/repeat_replay.cpp index abd848d8fb..5ce96e2e10 100644 --- a/src/unit_test/storage/wal/repeat_replay.cpp +++ b/src/unit_test/storage/wal/repeat_replay.cpp @@ -27,7 +27,6 @@ import log_file; import config; import bg_task; import background_process; -import local_file_system; import default_values; import status; import logger; diff --git a/test/data/config/test_close_bgtask_silent_vfs_off.toml b/test/data/config/test_close_bgtask_silent_vfs_off.toml new file mode 100644 index 0000000000..0bb99cc1a0 --- /dev/null +++ b/test/data/config/test_close_bgtask_silent_vfs_off.toml @@ -0,0 +1,25 @@ +[general] +version = "0.4.0" +time_zone = "utc-8" + +[network] +[log] +log_level = "critical" + +[storage] +data_dir = "/var/infinity/data" +# close auto optimize +optimize_interval = "0s" +# close auto cleanup task +cleanup_interval = "0s" +# close auto compaction +compact_interval = "0s" + +[buffer] +[wal] +# close delta checkpoint +delta_checkpoint_interval = "0s" +# close full checkpoint +full_checkpoint_interval = "0s" + +[resource]