Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] (batch write part3) Implement batch write on BE #52556

Merged
merged 21 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1489,4 +1489,13 @@ CONF_mInt32(max_committed_without_schema_rowset, "1000");

CONF_mInt32(apply_version_slow_log_sec, "30");

CONF_Int32(batch_write_thread_pool_num_min, "0");
CONF_Int32(batch_write_thread_pool_num_max, "512");
CONF_Int32(batch_write_thread_pool_queue_size, "4096");
CONF_mInt32(batch_write_default_timeout_ms, "600000");
CONF_mInt32(batch_write_rpc_request_retry_num, "10");
CONF_mInt32(batch_write_rpc_request_retry_interval_ms, "500");
CONF_mInt32(batch_write_rpc_reqeust_timeout_ms, "10000");
CONF_mInt32(batch_write_poll_load_status_interval_ms, "200");
CONF_mBool(batch_write_trace_log_enable, "false");
} // namespace starrocks::config
3 changes: 0 additions & 3 deletions be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,6 @@ Status ConnectorScanNode::_start_scan_thread(RuntimeState* state) {
}

Status ConnectorScanNode::_create_and_init_scanner(RuntimeState* state, TScanRange& scan_range) {
if (scan_range.__isset.broker_scan_range) {
scan_range.broker_scan_range.params.__set_non_blocking_read(false);
}
connector::DataSourcePtr data_source = _data_source_provider->create_data_source(scan_range);
data_source->set_predicates(_conjunct_ctxs);
data_source->set_runtime_filters(&_runtime_filter_collector);
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/csv_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ StatusOr<ChunkPtr> CSVScanner::get_next() {
// if timeout happens at the beginning of reading src_chunk, we return the error state
// else we will _materialize the lines read before timeout
if (src_chunk->num_rows() == 0) {
_reusable_empty_chunk.swap(src_chunk);
return status;
}
} else {
Expand Down Expand Up @@ -554,6 +555,11 @@ Status CSVScanner::_parse_csv(Chunk* chunk) {
}

ChunkPtr CSVScanner::_create_chunk(const std::vector<SlotDescriptor*>& slots) {
if (_reusable_empty_chunk) {
DCHECK(_reusable_empty_chunk->is_empty());
return std::move(_reusable_empty_chunk);
}

SCOPED_RAW_TIMER(&_counter->init_chunk_ns);

auto chunk = std::make_shared<Chunk>();
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/csv_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class CSVScanner final : public FileScanner {
bool _use_v2;
CSVReader::Fields fields;
CSVRow row;

// An empty chunk that can be reused as the container for the result of get_next().
// It's mainly for optimizing the performance where get_next() returns Status::Timeout
// frequently by avoiding creating a chunk in each call
ChunkPtr _reusable_empty_chunk = nullptr;
};

} // namespace starrocks
6 changes: 1 addition & 5 deletions be/src/exec/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,7 @@ Status FileScanner::create_sequential_file(const TBrokerRangeDesc& range_desc, c
range_desc.load_id.printTo(ss);
return Status::InternalError(std::string(ss.str()));
}
bool non_blocking_read = false;
if (params.__isset.non_blocking_read) {
non_blocking_read = params.non_blocking_read;
}
auto stream = std::make_shared<StreamLoadPipeInputStream>(std::move(pipe), non_blocking_read);
auto stream = std::make_shared<StreamLoadPipeInputStream>(std::move(pipe));
src_file = std::make_shared<SequentialFile>(std::move(stream), "stream-load-pipe");
break;
}
Expand Down
17 changes: 16 additions & 1 deletion be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ StatusOr<ChunkPtr> JsonScanner::get_next() {
// return Status::EndOfFile("EOF of reading json file, nothing read");
return src_chunk;
} else if (status.is_time_out()) {
if (src_chunk->is_empty()) {
_reusable_empty_chunk.swap(src_chunk);
}
// if timeout happens at the beginning of reading src_chunk, we return the error state
// else we will _materialize the lines read before timeout and return ok()
return status;
Expand Down Expand Up @@ -241,6 +244,12 @@ Status JsonScanner::parse_json_paths(const std::string& jsonpath, std::vector<st
}

Status JsonScanner::_create_src_chunk(ChunkPtr* chunk) {
if (_reusable_empty_chunk) {
DCHECK(_reusable_empty_chunk->is_empty());
_reusable_empty_chunk.swap(*chunk);
return Status::OK();
}

SCOPED_RAW_TIMER(&_counter->init_chunk_ns);
*chunk = std::make_shared<Chunk>();
size_t slot_size = _src_slot_descriptors.size();
Expand Down Expand Up @@ -283,7 +292,13 @@ Status JsonScanner::_open_next_reader() {
}
_cur_file_reader = std::make_unique<JsonReader>(_state, _counter, this, file, _strict_mode, _src_slot_descriptors,
_json_types, range_desc);
RETURN_IF_ERROR(_cur_file_reader->open());
st = _cur_file_reader->open();
// Timeout can happen when reading data from a TimeBoundedStreamLoadPipe.
// In this case, open file should be successful, and just need to try to
// read data next time
if (!st.ok() && !st.is_time_out()) {
return st;
}
_next_range++;
return Status::OK();
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class JsonScanner : public FileScanner {
std::vector<std::vector<SimpleJsonPath>> _json_paths;
std::vector<SimpleJsonPath> _root_paths;
bool _strip_outer_array = false;

// An empty chunk that can be reused as the container for the result of get_next().
// It's mainly for optimizing the performance where get_next() returns Status::Timeout
// frequently by avoiding creating a chunk in each call
ChunkPtr _reusable_empty_chunk = nullptr;
};

// Reader to parse the json.
Expand Down
48 changes: 19 additions & 29 deletions be/src/exec/pipeline/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/stream_pipeline_driver.h"
#include "exec/workgroup/work_group.h"
#include "runtime/batch_write/batch_write_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/exec_env.h"
Expand All @@ -35,6 +36,7 @@ namespace starrocks::pipeline {
FragmentContext::FragmentContext() : _data_sink(nullptr) {}

FragmentContext::~FragmentContext() {
_close_stream_load_contexts();
_data_sink.reset();
_runtime_filter_hub.close_all_in_filters(_runtime_state.get());
close_all_execution_groups();
Expand Down Expand Up @@ -197,6 +199,12 @@ void FragmentContext::set_final_status(const Status& status) {
auto* executor = executors->driver_executor();
iterate_drivers([executor](const DriverPtr& driver) { executor->cancel(driver.get()); });
}

for (const auto& stream_load_context : _stream_load_contexts) {
if (stream_load_context->body_sink) {
stream_load_context->body_sink->cancel(_s_status);
}
}
}
}

Expand All @@ -218,7 +226,6 @@ Status FragmentContext::prepare_all_pipelines() {

void FragmentContext::set_stream_load_contexts(const std::vector<StreamLoadContext*>& contexts) {
_stream_load_contexts = std::move(contexts);
_channel_stream_load = true;
}

void FragmentContext::cancel(const Status& status) {
Expand All @@ -235,19 +242,6 @@ void FragmentContext::cancel(const Status& status) {
query_options.load_job_type == TLoadJobType::INSERT_VALUES)) {
ExecEnv::GetInstance()->profile_report_worker()->unregister_pipeline_load(_query_id, _fragment_instance_id);
}

if (_stream_load_contexts.size() > 0) {
for (const auto& stream_load_context : _stream_load_contexts) {
if (stream_load_context->body_sink) {
Status st;
stream_load_context->body_sink->cancel(st);
}
if (_channel_stream_load) {
_runtime_state->exec_env()->stream_context_mgr()->remove_channel_context(stream_load_context);
}
}
_stream_load_contexts.resize(0);
}
}

FragmentContext* FragmentContextManager::get_or_register(const TUniqueId& fragment_id) {
Expand Down Expand Up @@ -312,21 +306,6 @@ void FragmentContextManager::unregister(const TUniqueId& fragment_id) {
ExecEnv::GetInstance()->profile_report_worker()->unregister_pipeline_load(it->second->query_id(),
fragment_id);
}
const auto& stream_load_contexts = it->second->_stream_load_contexts;

if (stream_load_contexts.size() > 0) {
for (const auto& stream_load_context : stream_load_contexts) {
if (stream_load_context->body_sink) {
Status st;
stream_load_context->body_sink->cancel(st);
}
if (it->second->_channel_stream_load) {
it->second->_runtime_state->exec_env()->stream_context_mgr()->remove_channel_context(
stream_load_context);
}
}
it->second->_stream_load_contexts.resize(0);
}
_fragment_contexts.erase(it);
}
}
Expand Down Expand Up @@ -410,4 +389,15 @@ Status FragmentContext::submit_active_drivers(DriverExecutor* executor) {
return Status::OK();
}

void FragmentContext::_close_stream_load_contexts() {
for (const auto& context : _stream_load_contexts) {
context->body_sink->cancel(Status::Cancelled("Close the stream load pipe"));
if (context->enable_batch_write) {
_runtime_state->exec_env()->batch_write_mgr()->unregister_stream_load_pipe(context);
} else {
_runtime_state->exec_env()->stream_context_mgr()->remove_channel_context(context);
}
}
}

} // namespace starrocks::pipeline
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class FragmentContext {
void set_report_when_finish(bool report) { _report_when_finish = report; }

private:
void _close_stream_load_contexts();

bool _enable_group_execution = false;
// Id of this query
TUniqueId _query_id;
Expand Down Expand Up @@ -209,7 +211,6 @@ class FragmentContext {
query_cache::CacheParam _cache_param;
bool _enable_cache = false;
std::vector<StreamLoadContext*> _stream_load_contexts;
bool _channel_stream_load = false;

// STREAM MV
std::atomic<size_t> _num_finished_epoch_pipelines = 0;
Expand Down
43 changes: 35 additions & 8 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "exec/workgroup/work_group.h"
#include "gutil/casts.h"
#include "gutil/map_util.h"
#include "runtime/batch_write/batch_write_mgr.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/data_stream_sender.h"
#include "runtime/descriptors.h"
Expand Down Expand Up @@ -623,6 +624,21 @@ Status FragmentExecutor::_prepare_stream_load_pipe(ExecEnv* exec_env, const Unif
return Status::OK();
}
std::vector<StreamLoadContext*> stream_load_contexts;

bool success = false;
DeferOp defer_op([&] {
if (!success) {
for (auto& ctx : stream_load_contexts) {
ctx->body_sink->cancel(Status::Cancelled("Failed to prepare stream load pipe"));
if (ctx->enable_batch_write) {
exec_env->batch_write_mgr()->unregister_stream_load_pipe(ctx);
} else {
exec_env->stream_context_mgr()->remove_channel_context(ctx);
}
}
}
});

for (; iter != scan_range_map.end(); iter++) {
for (; iter2 != iter->second.end(); iter2++) {
for (const auto& scan_range : iter2->second) {
Expand All @@ -634,19 +650,30 @@ Status FragmentExecutor::_prepare_stream_load_pipe(ExecEnv* exec_env, const Unif
TFileFormatType::type format = broker_scan_range.ranges[0].format_type;
TUniqueId load_id = broker_scan_range.ranges[0].load_id;
long txn_id = broker_scan_range.params.txn_id;
bool is_batch_write =
broker_scan_range.__isset.enable_batch_write && broker_scan_range.enable_batch_write;
StreamLoadContext* ctx = nullptr;
RETURN_IF_ERROR(exec_env->stream_context_mgr()->create_channel_context(
exec_env, label, channel_id, db_name, table_name, format, ctx, load_id, txn_id));
DeferOp op([&] {
if (ctx->unref()) {
delete ctx;
}
});
RETURN_IF_ERROR(exec_env->stream_context_mgr()->put_channel_context(label, channel_id, ctx));
if (is_batch_write) {
ASSIGN_OR_RETURN(ctx, BatchWriteMgr::create_and_register_pipe(
exec_env, exec_env->batch_write_mgr(), db_name, table_name,
broker_scan_range.batch_write_parameters, label, txn_id, load_id,
broker_scan_range.batch_write_interval_ms));
} else {
RETURN_IF_ERROR(exec_env->stream_context_mgr()->create_channel_context(
exec_env, label, channel_id, db_name, table_name, format, ctx, load_id, txn_id));
DeferOp op([&] {
if (ctx->unref()) {
delete ctx;
}
});
RETURN_IF_ERROR(exec_env->stream_context_mgr()->put_channel_context(label, channel_id, ctx));
}
stream_load_contexts.push_back(ctx);
}
}
}

success = true;
_fragment_ctx->set_stream_load_contexts(stream_load_contexts);
return Status::OK();
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,6 @@ ConnectorChunkSource::ConnectorChunkSource(ScanOperator* op, RuntimeProfile* run
TScanRange* scan_range = scan_morsel->get_scan_range();
ScanSplitContext* split_context = scan_morsel->get_split_context();

if (scan_range->__isset.broker_scan_range) {
scan_range->broker_scan_range.params.__set_non_blocking_read(true);
}
_data_source = scan_node->data_source_provider()->create_data_source(*scan_range);
_data_source->set_driver_sequence(op->get_driver_sequence());
_data_source->set_split_context(split_context);
Expand Down
Loading
Loading