Skip to content

Commit

Permalink
new virtual store and local_file_handle (infiniflow#1947)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Sep 30, 2024
1 parent 8b46f2c commit 3319bb8
Show file tree
Hide file tree
Showing 68 changed files with 1,239 additions and 701 deletions.
4 changes: 2 additions & 2 deletions benchmark/local_infinity/fulltext/fulltext_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import match_expr;
import function_expr;
import search_expr;
import column_expr;
import virtual_storage;
import virtual_store;

using namespace infinity;

Expand Down Expand Up @@ -123,7 +123,7 @@ void BenchmarkImport(SharedPtr<Infinity> infinity,
const String &db_name,
const String &table_name,
const String &import_from) {
if (!VirtualStorage::ExistsLocal(import_from)) {
if (!LocalStore::Exists(import_from)) {
LOG_ERROR(fmt::format("Data file doesn't exist: {}", import_from));
return;
}
Expand Down
6 changes: 3 additions & 3 deletions benchmark/local_infinity/infinity_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import knn_expr;
import column_def;
import statement_common;
import data_type;
import virtual_storage;
import virtual_store;

using namespace infinity;

Expand Down Expand Up @@ -84,7 +84,7 @@ int main() {

String path = "/var/infinity";

VirtualStorage::CleanupDirectoryLocal(path);
LocalStore::CleanupDirectory(path);

Infinity::LocalInit(path);

Expand Down Expand Up @@ -373,7 +373,7 @@ int main() {
auto r2 = infinity->CreateTable(db_name, table_name, std::move(column_defs), std::vector<TableConstraint *>{}, std::move(create_tb_options));

std::string sift_base_path = std::string(test_data_path()) + "/benchmark/sift/base.fvecs";
if (!VirtualStorage::ExistsLocal(sift_base_path)) {
if (!LocalStore::Exists(sift_base_path)) {
std::cout << "File: " << sift_base_path << " doesn't exist" << std::endl;
break;
}
Expand Down
8 changes: 4 additions & 4 deletions benchmark/local_infinity/knn/knn_import_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import knn_expr;
import column_def;
import statement_common;
import data_type;
import virtual_storage;
import virtual_store;

using namespace infinity;

Expand All @@ -59,10 +59,10 @@ int main(int argc, char *argv[]) {
data_path = std::string(argv[3]);
}

if (VirtualStorage::ExistsLocal(data_path)) {
if (LocalStore::Exists(data_path)) {
std::cout << "Data path: " << data_path << " is already existed." << std::endl;
} else {
VirtualStorage::MakeDirectoryLocal(data_path);
LocalStore::MakeDirectory(data_path);
std::cout << "Data path: " << data_path << " is created." << std::endl;
}

Expand Down Expand Up @@ -113,7 +113,7 @@ int main(int argc, char *argv[]) {

// auto [ table, status2 ] = data_base->GetTable(table_name);

if (!VirtualStorage::ExistsLocal(base_path)) {
if (!LocalStore::Exists(base_path)) {
std::cout << "File: " << base_path << " doesn't exist" << std::endl;
break;
}
Expand Down
32 changes: 17 additions & 15 deletions src/executor/operator/physical_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ import status;
import buffer_manager;
import default_values;
import internal_types;
import virtual_store;
import local_file_handle;
import abstract_file_handle;

namespace infinity {

Expand Down Expand Up @@ -100,11 +103,11 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta
SizeT select_column_count = select_columns.size();

LocalFileSystem fs;
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
auto [file_handle, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
if (!status.ok()) {
RecoverableError(status);
}
DeferFn file_defer([&]() { fs.Close(*file_handler); });
DeferFn file_defer([&]() { fs.Close(*file_handle); });

if (header_) {
// Output CSV header
Expand Down Expand Up @@ -135,7 +138,7 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta
header += '\n';
}
}
fs.Write(*file_handler, header.c_str(), header.size());
fs.Write(*file_handle, header.c_str(), header.size());
}

SizeT offset = offset_;
Expand Down Expand Up @@ -217,16 +220,16 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta

if (row_count > 0 && this->row_limit_ != 0 && (row_count % this->row_limit_) == 0) {
++file_no_;
fs.Close(*file_handler);
fs.Close(*file_handle);
String new_file_path = fmt::format("{}.part{}", file_path_, file_no_);
auto result = fs.OpenFile(new_file_path, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
if (!result.second.ok()) {
RecoverableError(result.second);
}
file_handler = std::move(result.first);
file_handle = std::move(result.first);
}

fs.Write(*file_handler, line.c_str(), line.size());
fs.Write(*file_handle, line.c_str(), line.size());

++row_count;
if (limit_ != 0 && row_count == limit_) {
Expand Down Expand Up @@ -257,12 +260,11 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS

SizeT select_column_count = select_columns.size();

LocalFileSystem fs;
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kWrite);
if (!status.ok()) {
RecoverableError(status);
}
DeferFn file_defer([&]() { fs.Close(*file_handler); });
DeferFn file_defer([&]() { file_handle->Close(); });

SizeT offset = offset_;
SizeT row_count{0};
Expand Down Expand Up @@ -347,18 +349,18 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS
}
if (row_count > 0 && this->row_limit_ != 0 && (row_count % this->row_limit_) == 0) {
++file_no_;
fs.Close(*file_handler);
file_handle->Close();
String new_file_path = fmt::format("{}.part{}", file_path_, file_no_);
auto result = fs.OpenFile(new_file_path, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
if (!result.second.ok()) {
RecoverableError(result.second);
auto [part_file_handle, part_status] = LocalStore::Open(new_file_path, FileAccessMode::kWrite);
if (!part_status.ok()) {
RecoverableError(part_status);
}
file_handler = std::move(result.first);
file_handle = std::move(part_file_handle);
}

// LOG_DEBUG(line_json.dump());
String to_write = line_json.dump() + "\n";
fs.Write(*file_handler, to_write.c_str(), to_write.size());
file_handle->Append(to_write.c_str(), to_write.size());
++row_count;
if (limit_ != 0 && row_count == limit_) {
return row_count;
Expand Down
9 changes: 7 additions & 2 deletions src/main/cluster_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,12 @@ void ClusterManager::CheckHeartBeat() {
}

void ClusterManager::CheckHeartBeatInner() {
auto hb_interval = std::chrono::milliseconds(leader_node_->heartbeat_interval_);
if (this_node_->node_role_ != NodeRole::kLeader) {
String error_message = "Invalid node role.";
UnrecoverableError(error_message);
}
// this_node_ is the leader;
auto hb_interval = std::chrono::milliseconds(this_node_->heartbeat_interval_);
while (true) {
std::unique_lock hb_lock(this->hb_mutex_);
this->hb_cv_.wait_for(hb_lock, hb_interval, [&] { return !this->hb_running_; });
Expand All @@ -245,7 +250,7 @@ void ClusterManager::CheckHeartBeatInner() {

for (auto &[node_name, node_info] : other_node_map_) {
if (node_info->node_status_ == NodeStatus::kAlive) {
if (node_info->last_update_ts_ + 2 * leader_node_->heartbeat_interval_ < this_node_->last_update_ts_) {
if (node_info->last_update_ts_ + 2 * this_node_->heartbeat_interval_ < this_node_->last_update_ts_) {
node_info->node_status_ = NodeStatus::kTimeout;
LOG_INFO(fmt::format("Node {} is timeout", node_name));
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import boost;
import compilation_config;
import default_values;
import logger;
import virtual_storage;
import virtual_store;
import utility;
import status;
import options;
Expand Down Expand Up @@ -116,7 +116,7 @@ Status Config::ParseTimeInfo(const String &time_info, i64 &time_seconds) {

Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig *default_config) {
toml::table config_toml{};
if (config_path.get() == nullptr || config_path->empty() || !VirtualStorage::ExistsLocal(std::filesystem::absolute(*config_path))) {
if (config_path.get() == nullptr || config_path->empty() || !LocalStore::Exists(std::filesystem::absolute(*config_path))) {
if (config_path.get() == nullptr || config_path->empty()) {
fmt::print("No config file is given, use default configs.\n");
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/main/infinity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import config;
import resource_manager;
import task_scheduler;
import storage;
import local_file_system;
import virtual_store;
import third_party;
import query_options;
import query_result;
Expand Down Expand Up @@ -57,6 +57,7 @@ import query_options;
import extra_ddl_info;
import drop_index_info;
import drop_table_info;
import third_party;

import infinity_exception;
import third_party;
Expand All @@ -68,10 +69,9 @@ u64 Infinity::GetSessionId() { return session_->session_id(); }
void Infinity::Hello() { fmt::print("hello infinity\n"); }

void Infinity::LocalInit(const String &path) {
LocalFileSystem fs;

SharedPtr<String> config_path = MakeShared<String>(std::filesystem::absolute(path + "/infinity_conf.toml"));
if (fs.Exists(*config_path)) {
if (LocalStore::Exists(*config_path)) {
InfinityContext::instance().Init(config_path);
} else {
UniquePtr<DefaultConfig> default_config = MakeUnique<DefaultConfig>();
Expand Down
3 changes: 0 additions & 3 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ void InfinityContext::Init(const SharedPtr<String> &config_path, bool admin_flag
config_ = MakeUnique<Config>();
auto status = config_->Init(config_path, default_config);
if (!status.ok()) {
fmt::print("Error: {}", *status.msg_);
std::exit(static_cast<int>(status.code()));
}

Logger::Initialize(config_.get());

resource_manager_ = MakeUnique<ResourceManager>(config_->CPULimit(), 0);
Expand All @@ -74,7 +72,6 @@ void InfinityContext::Init(const SharedPtr<String> &config_path, bool admin_flag
UnrecoverableError(status.message());
return;
}

if (admin_flag or config_->ServerMode() == "cluster") {
// Admin mode or cluster start phase
return;
Expand Down
10 changes: 3 additions & 7 deletions src/planner/logical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import logical_command;
import explain_logical_plan;
import explain_ast;

import local_file_system;
import virtual_store;

import status;
import default_values;
Expand Down Expand Up @@ -918,10 +918,8 @@ Status LogicalPlanner::BuildExport(const CopyStatement *statement, SharedPtr<Bin
}

// Check the file existence
LocalFileSystem fs;

String to_write_path;
if (fs.Exists(statement->file_path_)) {
if (LocalStore::Exists(statement->file_path_)) {
Status status = Status::DuplicatedFile(statement->file_path_);
RecoverableError(status);
}
Expand Down Expand Up @@ -1080,10 +1078,8 @@ Status LogicalPlanner::BuildImport(const CopyStatement *statement, SharedPtr<Bin
}

// Check the file existence
LocalFileSystem fs;

String to_write_path;
if (!fs.Exists(statement->file_path_)) {
if (!LocalStore::Exists(statement->file_path_)) {
RecoverableError(Status::FileNotFound(statement->file_path_));
}

Expand Down
8 changes: 4 additions & 4 deletions src/storage/buffer/file_worker/bmp_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ BMPIndexFileWorker::BMPIndexFileWorker(SharedPtr<String> data_dir,
LocalFileSystem fs;

String index_path = GetFilePath();
auto [file_handler, status] = fs.OpenFile(index_path, FileFlags::READ_FLAG, FileLockType::kNoLock);
auto [file_handle, status] = fs.OpenFile(index_path, FileFlags::READ_FLAG, FileLockType::kNoLock);
if (status.ok()) {
// When replay by full checkpoint, the data is deleted, but catalog is recovered. Do not read file in recovery.
index_size = fs.GetFileSize(*file_handler);
index_size = fs.GetFileSize(*file_handle);
}
}
index_size_ = index_size;
Expand Down Expand Up @@ -104,7 +104,7 @@ bool BMPIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, c
if constexpr (std::is_same_v<T, std::nullptr_t>) {
UnrecoverableError("Invalid index type.");
} else {
index->Save(*file_handler_);
index->Save(*file_handle_);
}
},
*bmp_index);
Expand All @@ -125,7 +125,7 @@ void BMPIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
UnrecoverableError("Invalid index type.");
} else {
using IndexT = std::decay_t<decltype(*index)>;
index = new IndexT(IndexT::Load(*file_handler_));
index = new IndexT(IndexT::Load(*file_handle_));
}
},
*bmp_index);
Expand Down
Loading

0 comments on commit 3319bb8

Please sign in to comment.