Skip to content

Commit

Permalink
tmpt
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Nov 3, 2024
1 parent a13c2a3 commit 906086d
Show file tree
Hide file tree
Showing 37 changed files with 1,129 additions and 106 deletions.
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1485,4 +1485,9 @@ CONF_mInt32(max_committed_without_schema_rowset, "1000");

CONF_mInt32(apply_version_slow_log_sec, "30");

CONF_mInt32(batch_write_thread_pool_num_min, "0");
CONF_mInt32(batch_write_thread_pool_num_max, "512");
CONF_mInt32(batch_write_retry_num, "10");
CONF_mInt32(batch_write_request_interval_ms, "500");
CONF_mInt32(batch_write_reqeust_timeout_ms, "10000");
} // namespace starrocks::config
3 changes: 0 additions & 3 deletions be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,6 @@ Status ConnectorScanNode::_start_scan_thread(RuntimeState* state) {
}

Status ConnectorScanNode::_create_and_init_scanner(RuntimeState* state, TScanRange& scan_range) {
if (scan_range.__isset.broker_scan_range) {
scan_range.broker_scan_range.params.__set_non_blocking_read(false);
}
connector::DataSourcePtr data_source = _data_source_provider->create_data_source(scan_range);
data_source->set_predicates(_conjunct_ctxs);
data_source->set_runtime_filters(&_runtime_filter_collector);
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/csv_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ StatusOr<ChunkPtr> CSVScanner::get_next() {
// if timeout happens at the beginning of reading src_chunk, we return the error state
// else we will _materialize the lines read before timeout
if (src_chunk->num_rows() == 0) {
_reusable_empty_chunk.swap(src_chunk);
return status;
}
} else {
Expand Down Expand Up @@ -554,6 +555,11 @@ Status CSVScanner::_parse_csv(Chunk* chunk) {
}

ChunkPtr CSVScanner::_create_chunk(const std::vector<SlotDescriptor*>& slots) {
if (_reusable_empty_chunk) {
DCHECK(_reusable_empty_chunk->is_empty());
return std::move(_reusable_empty_chunk);
}

SCOPED_RAW_TIMER(&_counter->init_chunk_ns);

auto chunk = std::make_shared<Chunk>();
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/csv_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class CSVScanner final : public FileScanner {
bool _use_v2;
CSVReader::Fields fields;
CSVRow row;

// An empty chunk that can be reused as the container for the result of get_next().
// It's mainly for optimizing the performance where get_next() returns Status::Timeout
// frequently by avoiding creating a chunk in each call
ChunkPtr _reusable_empty_chunk = nullptr;
};

} // namespace starrocks
6 changes: 1 addition & 5 deletions be/src/exec/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,7 @@ Status FileScanner::create_sequential_file(const TBrokerRangeDesc& range_desc, c
range_desc.load_id.printTo(ss);
return Status::InternalError(std::string(ss.str()));
}
bool non_blocking_read = false;
if (params.__isset.non_blocking_read) {
non_blocking_read = params.non_blocking_read;
}
auto stream = std::make_shared<StreamLoadPipeInputStream>(std::move(pipe), non_blocking_read);
auto stream = std::make_shared<StreamLoadPipeInputStream>(std::move(pipe));
src_file = std::make_shared<SequentialFile>(std::move(stream), "stream-load-pipe");
break;
}
Expand Down
17 changes: 16 additions & 1 deletion be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ StatusOr<ChunkPtr> JsonScanner::get_next() {
// return Status::EndOfFile("EOF of reading json file, nothing read");
return src_chunk;
} else if (status.is_time_out()) {
if (src_chunk->is_empty()) {
_reusable_empty_chunk.swap(src_chunk);
}
// if timeout happens at the beginning of reading src_chunk, we return the error state
// else we will _materialize the lines read before timeout and return ok()
return status;
Expand Down Expand Up @@ -241,6 +244,12 @@ Status JsonScanner::parse_json_paths(const std::string& jsonpath, std::vector<st
}

Status JsonScanner::_create_src_chunk(ChunkPtr* chunk) {
if (_reusable_empty_chunk) {
DCHECK(_reusable_empty_chunk->is_empty());
_reusable_empty_chunk.swap(*chunk);
return Status::OK();
}

SCOPED_RAW_TIMER(&_counter->init_chunk_ns);
*chunk = std::make_shared<Chunk>();
size_t slot_size = _src_slot_descriptors.size();
Expand Down Expand Up @@ -283,7 +292,13 @@ Status JsonScanner::_open_next_reader() {
}
_cur_file_reader = std::make_unique<JsonReader>(_state, _counter, this, file, _strict_mode, _src_slot_descriptors,
_json_types, range_desc);
RETURN_IF_ERROR(_cur_file_reader->open());
st = _cur_file_reader->open();
// Timeout can happen when reading data from a TimeBoundedStreamLoadPipe.
// In this case, open file should be successful, and just need to try to
// read data next time
if (!st.ok() && !st.is_time_out()) {
return st;
}
_next_range++;
return Status::OK();
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class JsonScanner : public FileScanner {
std::vector<std::vector<SimpleJsonPath>> _json_paths;
std::vector<SimpleJsonPath> _root_paths;
bool _strip_outer_array = false;

// An empty chunk that can be reused as the container for the result of get_next().
// It's mainly for optimizing the performance where get_next() returns Status::Timeout
// frequently by avoiding creating a chunk in each call
ChunkPtr _reusable_empty_chunk = nullptr;
};

// Reader to parse the json.
Expand Down
48 changes: 19 additions & 29 deletions be/src/exec/pipeline/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/stream_pipeline_driver.h"
#include "exec/workgroup/work_group.h"
#include "runtime/batch_write/batch_write_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/exec_env.h"
Expand All @@ -35,6 +36,7 @@ namespace starrocks::pipeline {
FragmentContext::FragmentContext() : _data_sink(nullptr) {}

FragmentContext::~FragmentContext() {
_close_stream_load_contexts();
_data_sink.reset();
_runtime_filter_hub.close_all_in_filters(_runtime_state.get());
close_all_execution_groups();
Expand Down Expand Up @@ -197,6 +199,12 @@ void FragmentContext::set_final_status(const Status& status) {
auto* executor = executors->driver_executor();
iterate_drivers([executor](const DriverPtr& driver) { executor->cancel(driver.get()); });
}

for (const auto& stream_load_context : _stream_load_contexts) {
if (stream_load_context->body_sink) {
stream_load_context->body_sink->cancel(_s_status);
}
}
}
}

Expand All @@ -218,7 +226,6 @@ Status FragmentContext::prepare_all_pipelines() {

void FragmentContext::set_stream_load_contexts(const std::vector<StreamLoadContext*>& contexts) {
_stream_load_contexts = std::move(contexts);
_channel_stream_load = true;
}

void FragmentContext::cancel(const Status& status) {
Expand All @@ -235,19 +242,6 @@ void FragmentContext::cancel(const Status& status) {
query_options.load_job_type == TLoadJobType::INSERT_VALUES)) {
ExecEnv::GetInstance()->profile_report_worker()->unregister_pipeline_load(_query_id, _fragment_instance_id);
}

if (_stream_load_contexts.size() > 0) {
for (const auto& stream_load_context : _stream_load_contexts) {
if (stream_load_context->body_sink) {
Status st;
stream_load_context->body_sink->cancel(st);
}
if (_channel_stream_load) {
_runtime_state->exec_env()->stream_context_mgr()->remove_channel_context(stream_load_context);
}
}
_stream_load_contexts.resize(0);
}
}

FragmentContext* FragmentContextManager::get_or_register(const TUniqueId& fragment_id) {
Expand Down Expand Up @@ -312,21 +306,6 @@ void FragmentContextManager::unregister(const TUniqueId& fragment_id) {
ExecEnv::GetInstance()->profile_report_worker()->unregister_pipeline_load(it->second->query_id(),
fragment_id);
}
const auto& stream_load_contexts = it->second->_stream_load_contexts;

if (stream_load_contexts.size() > 0) {
for (const auto& stream_load_context : stream_load_contexts) {
if (stream_load_context->body_sink) {
Status st;
stream_load_context->body_sink->cancel(st);
}
if (it->second->_channel_stream_load) {
it->second->_runtime_state->exec_env()->stream_context_mgr()->remove_channel_context(
stream_load_context);
}
}
it->second->_stream_load_contexts.resize(0);
}
_fragment_contexts.erase(it);
}
}
Expand Down Expand Up @@ -410,4 +389,15 @@ Status FragmentContext::submit_active_drivers(DriverExecutor* executor) {
return Status::OK();
}

void FragmentContext::_close_stream_load_contexts() {
for (const auto& context : _stream_load_contexts) {
context->body_sink->cancel(Status::Cancelled("Close the stream load pipe"));
if (context->enable_batch_write) {
_runtime_state->exec_env()->batch_write_mgr()->unregister_stream_load_pipe(context);
} else {
_runtime_state->exec_env()->stream_context_mgr()->remove_channel_context(context);
}
}
}

} // namespace starrocks::pipeline
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class FragmentContext {
void set_report_when_finish(bool report) { _report_when_finish = report; }

private:
void _close_stream_load_contexts();

bool _enable_group_execution = false;
// Id of this query
TUniqueId _query_id;
Expand Down Expand Up @@ -209,7 +211,6 @@ class FragmentContext {
query_cache::CacheParam _cache_param;
bool _enable_cache = false;
std::vector<StreamLoadContext*> _stream_load_contexts;
bool _channel_stream_load = false;

// STREAM MV
std::atomic<size_t> _num_finished_epoch_pipelines = 0;
Expand Down
46 changes: 38 additions & 8 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "exec/workgroup/work_group.h"
#include "gutil/casts.h"
#include "gutil/map_util.h"
#include "runtime/batch_write/batch_write_mgr.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/data_stream_sender.h"
#include "runtime/descriptors.h"
Expand Down Expand Up @@ -86,6 +87,7 @@ const TDataSink& UnifiedExecPlanFragmentParams::output_sink() const {
/// FragmentExecutor.
FragmentExecutor::FragmentExecutor() {
_fragment_start_time = MonotonicNanos();
LOG(INFO) << "create FragmentExecutor";
}

Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExecPlanFragmentParams& request) {
Expand Down Expand Up @@ -619,6 +621,21 @@ Status FragmentExecutor::_prepare_stream_load_pipe(ExecEnv* exec_env, const Unif
return Status::OK();
}
std::vector<StreamLoadContext*> stream_load_contexts;

bool success = false;
DeferOp defer_op([&] {
if (!success) {
for (auto& ctx : stream_load_contexts) {
ctx->body_sink->cancel(Status::Cancelled("Failed to prepare stream load pipe"));
if (ctx->enable_batch_write) {
exec_env->batch_write_mgr()->unregister_stream_load_pipe(ctx);
} else {
exec_env->stream_context_mgr()->remove_channel_context(ctx);
}
}
}
});

for (; iter != scan_range_map.end(); iter++) {
for (; iter2 != iter->second.end(); iter2++) {
for (const auto& scan_range : iter2->second) {
Expand All @@ -630,19 +647,30 @@ Status FragmentExecutor::_prepare_stream_load_pipe(ExecEnv* exec_env, const Unif
TFileFormatType::type format = broker_scan_range.ranges[0].format_type;
TUniqueId load_id = broker_scan_range.ranges[0].load_id;
long txn_id = broker_scan_range.params.txn_id;
bool is_batch_write =
broker_scan_range.__isset.enable_batch_write && broker_scan_range.enable_batch_write;
StreamLoadContext* ctx = nullptr;
RETURN_IF_ERROR(exec_env->stream_context_mgr()->create_channel_context(
exec_env, label, channel_id, db_name, table_name, format, ctx, load_id, txn_id));
DeferOp op([&] {
if (ctx->unref()) {
delete ctx;
}
});
RETURN_IF_ERROR(exec_env->stream_context_mgr()->put_channel_context(label, channel_id, ctx));
if (is_batch_write) {
ASSIGN_OR_RETURN(ctx, BatchWriteMgr::create_and_register_context(
exec_env, db_name, table_name, label, txn_id, load_id,
broker_scan_range.batch_write_interval_ms,
broker_scan_range.batch_write_parameters));
} else {
RETURN_IF_ERROR(exec_env->stream_context_mgr()->create_channel_context(
exec_env, label, channel_id, db_name, table_name, format, ctx, load_id, txn_id));
DeferOp op([&] {
if (ctx->unref()) {
delete ctx;
}
});
RETURN_IF_ERROR(exec_env->stream_context_mgr()->put_channel_context(label, channel_id, ctx));
}
stream_load_contexts.push_back(ctx);
}
}
}

success = true;
_fragment_ctx->set_stream_load_contexts(stream_load_contexts);
return Status::OK();
}
Expand Down Expand Up @@ -772,6 +800,7 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
const TExecPlanFragmentParams& unique_request) {
DCHECK(common_request.__isset.desc_tbl);
DCHECK(common_request.__isset.fragment);
LOG(INFO) << "run FragmentExecutor::prepare";

UnifiedExecPlanFragmentParams request(common_request, unique_request);

Expand Down Expand Up @@ -871,6 +900,7 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
}

Status FragmentExecutor::execute(ExecEnv* exec_env) {
LOG(INFO) << "run FragmentExecutor::execute";
bool prepare_success = false;
DeferOp defer([this, &prepare_success]() {
if (!prepare_success) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,6 @@ ConnectorChunkSource::ConnectorChunkSource(ScanOperator* op, RuntimeProfile* run
TScanRange* scan_range = scan_morsel->get_scan_range();
ScanSplitContext* split_context = scan_morsel->get_split_context();

if (scan_range->__isset.broker_scan_range) {
scan_range->broker_scan_range.params.__set_non_blocking_read(true);
}
_data_source = scan_node->data_source_provider()->create_data_source(*scan_range);
_data_source->set_driver_sequence(op->get_driver_sequence());
_data_source->set_split_context(split_context);
Expand Down
Loading

0 comments on commit 906086d

Please sign in to comment.