Skip to content

Commit

Permalink
[Enhancement] Optimize upload and download for backup restore (backport
Browse files Browse the repository at this point in the history
#52574) (#52623)

Co-authored-by: xiangguangyxg <[email protected]>
  • Loading branch information
mergify[bot] and xiangguangyxg authored Nov 5, 2024
1 parent 9be095c commit 001c40d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 28 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ CONF_Int32(update_schema_worker_count, "3");
CONF_mInt32(upload_worker_count, "0");
// The count of thread to download.
CONF_mInt32(download_worker_count, "0");
// The buffer size to upload.
CONF_mInt32(upload_buffer_size, "4194304");
// The buffer size to download.
CONF_mInt32(download_buffer_size, "4194304");
// The count of thread to make snapshot.
CONF_mInt32(make_snapshot_worker_count, "5");
// The count of thread to release snapshot.
Expand Down
44 changes: 16 additions & 28 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,12 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
std::map<int64_t, std::vector<std::string>>* tablet_files) {
if (!upload.__isset.use_broker || upload.use_broker) {
LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size()
<< ", broker addr: " << upload.broker_addr << ", job: " << _job_id << ", task" << _task_id;
<< ", broker addr: " << upload.broker_addr << ", job id: " << _job_id << ", task id: " << _task_id;
} else {
LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size() << ", job: " << _job_id
<< ", task" << _task_id;
LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size() << ", job id: " << _job_id
<< ", task id: " << _task_id;
}

// check if job has already been cancelled
int tmp_counter = 1;
RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::UPLOAD));

Status status = Status::OK();
// 1. validate local tablet snapshot paths
RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true));
Expand Down Expand Up @@ -201,12 +197,11 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
}
}
ASSIGN_OR_RETURN(auto input_file, FileSystem::Default()->new_sequential_file(local_file_path));
auto res = fs::copy(input_file.get(), remote_writable_file.get(), 1024 * 1024);
if (!res.ok()) {
return res.status();
}
LOG(INFO) << "finished to write the file: " << local_file_path << ", length: " << *res;
ASSIGN_OR_RETURN(auto file_size,
fs::copy(input_file.get(), remote_writable_file.get(), config::upload_buffer_size));
RETURN_IF_ERROR(remote_writable_file->close());
LOG(INFO) << "finished to upload file: " << local_file_path << ", length: " << file_size;

// rename file to end with ".md5sum"
if (!upload.__isset.use_broker || upload.use_broker) {
RETURN_IF_ERROR(
Expand All @@ -224,7 +219,7 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
LOG(INFO) << "finished to write tablet to remote. local path: " << src_path << ", remote path: " << dest_path;
} // end for each tablet path

LOG(INFO) << "finished to upload snapshots. job: " << _job_id << ", task id: " << _task_id;
LOG(INFO) << "finished to upload snapshots. job id: " << _job_id << ", task id: " << _task_id;
return status;
}

Expand All @@ -237,16 +232,12 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
const TDownloadReq& download, std::vector<int64_t>* downloaded_tablet_ids) {
if (!download.__isset.use_broker || download.use_broker) {
LOG(INFO) << "begin to download snapshot files. num: " << src_to_dest_path.size()
<< ", broker addr: " << download.broker_addr << ", job: " << _job_id << ", task id: " << _task_id;
<< ", broker addr: " << download.broker_addr << ", job id: " << _job_id << ", task id: " << _task_id;
} else {
LOG(INFO) << "begin to download snapshot files. num: " << src_to_dest_path.size() << ", job: " << _job_id
LOG(INFO) << "begin to download snapshot files. num: " << src_to_dest_path.size() << ", job id: " << _job_id
<< ", task id: " << _task_id;
}

// check if job has already been cancelled
int tmp_counter = 1;
RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));

Status status = Status::OK();
// 1. validate local tablet snapshot paths
RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, false));
Expand Down Expand Up @@ -389,11 +380,8 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
// 3. open local file for write
WritableFileOptions opts{.sync_on_close = false, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
ASSIGN_OR_RETURN(auto local_file, FileSystem::Default()->new_writable_file(opts, full_local_file));

auto res = fs::copy(remote_sequential_file.get(), local_file.get(), 1024 * 1024);
if (!res.ok()) {
return res.status();
}
ASSIGN_OR_RETURN(auto file_size,
fs::copy(remote_sequential_file.get(), local_file.get(), config::download_buffer_size));
RETURN_IF_ERROR(local_file->close());

// 5. check md5 of the downloaded file
Expand All @@ -409,7 +397,7 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to

// local_files always keep the updated local files
local_files.push_back(local_file_name);
LOG(INFO) << "finished to download the file: " << full_local_file << ", length: " << file_len;
LOG(INFO) << "finished to download the file: " << full_local_file << ", length: " << file_size;
} // end for all remote files

// finally, delete local files which are not in remote
Expand Down Expand Up @@ -440,7 +428,7 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
finished_num++;
} // end for src_to_dest_path

LOG(INFO) << "finished to download snapshots. job: " << _job_id << ", task id: " << _task_id;
LOG(INFO) << "finished to download snapshots. job id: " << _job_id << ", task id: " << _task_id;
return status;
}

Expand All @@ -450,7 +438,7 @@ Status SnapshotLoader::primary_key_move(const std::string& snapshot_path, const
std::string tablet_path = tablet->schema_hash_path();
std::string store_path = tablet->data_dir()->path();
LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path
<< ", store: " << store_path << ", job: " << _job_id << ", task id: " << _task_id;
<< ", store: " << store_path << ", job id: " << _job_id << ", task id: " << _task_id;

Status status = Status::OK();

Expand Down Expand Up @@ -587,7 +575,7 @@ Status SnapshotLoader::move(const std::string& snapshot_path, const TabletShared
std::string tablet_path = tablet->schema_hash_path();
std::string store_path = tablet->data_dir()->path();
LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path
<< ", store: " << store_path << ", job: " << _job_id << ", task id: " << _task_id;
<< ", store: " << store_path << ", job id: " << _job_id << ", task id: " << _task_id;

Status status = Status::OK();

Expand Down

0 comments on commit 001c40d

Please sign in to comment.