From a0b2a63e06b24db6f132444a8b059c2d96e461a5 Mon Sep 17 00:00:00 2001 From: stdpain <34912776+stdpain@users.noreply.github.com> Date: Tue, 31 Dec 2024 16:16:32 +0800 Subject: [PATCH] [Feature] support event scheduler for scan/exchange/agg (part 2) (#54338) Signed-off-by: stdpain --- be/src/exec/CMakeLists.txt | 1 + be/src/exec/aggregator.cpp | 2 + be/src/exec/aggregator.h | 12 +++ be/src/exec/chunk_buffer_memory_manager.h | 22 +++- .../aggregate_blocking_sink_operator.cpp | 2 + .../aggregate_blocking_sink_operator.h | 2 + .../aggregate_blocking_source_operator.cpp | 7 ++ .../aggregate_blocking_source_operator.h | 1 + ...regate_distinct_blocking_sink_operator.cpp | 5 +- ...ggregate_distinct_blocking_sink_operator.h | 2 + ...gate_distinct_blocking_source_operator.cpp | 6 ++ ...regate_distinct_blocking_source_operator.h | 1 + ...egate_distinct_streaming_sink_operator.cpp | 2 + ...gregate_distinct_streaming_sink_operator.h | 2 + ...ate_distinct_streaming_source_operator.cpp | 11 ++ ...egate_distinct_streaming_source_operator.h | 3 + .../aggregate_streaming_sink_operator.cpp | 14 ++- .../aggregate_streaming_sink_operator.h | 4 + .../aggregate_streaming_source_operator.cpp | 8 ++ .../aggregate_streaming_source_operator.h | 3 + ...illable_aggregate_blocking_sink_operator.h | 2 + ...lable_aggregate_blocking_source_operator.h | 2 + ...ble_aggregate_distinct_blocking_operator.h | 4 + .../exchange/exchange_sink_operator.cpp | 2 +- .../exchange/exchange_sink_operator.h | 2 + .../exchange/exchange_source_operator.cpp | 7 ++ .../exchange/exchange_source_operator.h | 6 +- .../exec/pipeline/exchange/local_exchange.h | 31 ++++-- .../exchange/local_exchange_sink_operator.cpp | 7 ++ .../exchange/local_exchange_sink_operator.h | 2 + .../local_exchange_source_operator.cpp | 33 ++++-- .../exchange/local_exchange_source_operator.h | 9 +- be/src/exec/pipeline/exchange/sink_buffer.cpp | 12 ++- be/src/exec/pipeline/exchange/sink_buffer.h | 15 ++- be/src/exec/pipeline/fragment_context.cpp | 65 +++++++++++ be/src/exec/pipeline/fragment_context.h | 17 ++- be/src/exec/pipeline/fragment_executor.cpp | 35 +++++- be/src/exec/pipeline/pipeline.cpp | 7 +- .../pipeline/pipeline_driver_executor.cpp | 5 + .../exec/pipeline/pipeline_driver_poller.cpp | 7 ++ .../exec/pipeline/pipeline_driver_queue.cpp | 1 + be/src/exec/pipeline/result_sink_operator.cpp | 2 + be/src/exec/pipeline/result_sink_operator.h | 2 + .../pipeline/scan/chunk_buffer_limiter.cpp | 9 +- .../exec/pipeline/scan/chunk_buffer_limiter.h | 20 ++-- be/src/exec/pipeline/scan/chunk_source.cpp | 3 + .../exec/pipeline/scan/olap_scan_context.cpp | 7 +- be/src/exec/pipeline/scan/olap_scan_context.h | 20 +++- .../exec/pipeline/scan/olap_scan_operator.cpp | 21 +++- .../exec/pipeline/scan/olap_scan_operator.h | 4 + .../scan/olap_scan_prepare_operator.cpp | 1 + .../scan/olap_scan_prepare_operator.h | 2 + be/src/exec/pipeline/scan/scan_operator.cpp | 9 +- be/src/exec/pipeline/scan/scan_operator.h | 21 ++++ be/src/exec/pipeline/schedule/observer.cpp | 10 +- .../exec/pipeline/schedule/timeout_tasks.cpp | 42 ++++++++ be/src/exec/pipeline/schedule/timeout_tasks.h | 46 ++++++++ .../sink/blackhole_table_sink_operator.h | 2 + be/src/exec/pipeline/source_operator.h | 22 ++++ be/src/exec/pipeline/stream_epoch_manager.h | 3 +- be/src/runtime/buffer_control_block.cpp | 2 + be/src/runtime/buffer_control_block.h | 26 +++++ be/src/runtime/data_stream_mgr.cpp | 1 + be/src/runtime/data_stream_mgr.h | 1 - be/src/runtime/data_stream_recvr.cpp | 25 +++-- be/src/runtime/data_stream_recvr.h | 21 ++++ be/src/runtime/exec_env.cpp | 5 + be/src/runtime/exec_env.h | 5 + be/src/runtime/sender_queue.cpp | 2 + test/sql/test_pipeline/R/test_event_scheduler | 101 ++++++++++++++++++ test/sql/test_pipeline/T/test_event_scheduler | 65 +++++++++++ 71 files changed, 822 insertions(+), 59 deletions(-) create mode 100644 be/src/exec/pipeline/schedule/timeout_tasks.cpp create mode 100644 be/src/exec/pipeline/schedule/timeout_tasks.h create mode 100644 test/sql/test_pipeline/R/test_event_scheduler create mode 100644 test/sql/test_pipeline/T/test_event_scheduler diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 2e3b373adbbf9..c2a3bd44cd82f 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -286,6 +286,7 @@ set(EXEC_FILES pipeline/group_execution/execution_group.cpp pipeline/group_execution/execution_group_builder.cpp pipeline/group_execution/group_operator.cpp + pipeline/schedule/timeout_tasks.cpp pipeline/schedule/event_scheduler.cpp pipeline/schedule/observer.cpp pipeline/schedule/pipeline_timer.cpp diff --git a/be/src/exec/aggregator.cpp b/be/src/exec/aggregator.cpp index a14f33672f018..84d69bfed3296 100644 --- a/be/src/exec/aggregator.cpp +++ b/be/src/exec/aggregator.cpp @@ -729,10 +729,12 @@ bool Aggregator::is_chunk_buffer_empty() { } ChunkPtr Aggregator::poll_chunk_buffer() { + auto notify = defer_notify_sink(); return _limited_buffer->pull(); } void Aggregator::offer_chunk_to_buffer(const ChunkPtr& chunk) { + auto notify = defer_notify_source(); _limited_buffer->push(chunk); } diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h index 555bdf6e7663e..d92125664f8b6 100644 --- a/be/src/exec/aggregator.h +++ b/be/src/exec/aggregator.h @@ -34,6 +34,7 @@ #include "exec/chunk_buffer_memory_manager.h" #include "exec/limited_pipeline_chunk_buffer.h" #include "exec/pipeline/context_with_dependency.h" +#include "exec/pipeline/schedule/observer.h" #include "exec/pipeline/spill_process_channel.h" #include "exprs/agg/aggregate_factory.h" #include "exprs/expr.h" @@ -402,6 +403,15 @@ class Aggregator : public pipeline::ContextWithDependency { HashTableKeyAllocator _state_allocator; + void attach_sink_observer(RuntimeState* state, pipeline::PipelineObserver* observer) { + _pip_observable.attach_sink_observer(state, observer); + } + void attach_source_observer(RuntimeState* state, pipeline::PipelineObserver* observer) { + _pip_observable.attach_source_observer(state, observer); + } + auto defer_notify_source() { return _pip_observable.defer_notify_source(); } + auto defer_notify_sink() { return _pip_observable.defer_notify_sink(); } + protected: AggregatorParamsPtr _params; @@ -510,6 +520,8 @@ class Aggregator : public pipeline::ContextWithDependency { // aggregate combinator functions since they are not persisted in agg hash map std::vector _combinator_function; + pipeline::PipeObservable _pip_observable; + public: void build_hash_map(size_t chunk_size, bool agg_group_by_with_limit = false); void build_hash_map(size_t chunk_size, std::atomic& shared_limit_countdown, bool agg_group_by_with_limit); diff --git a/be/src/exec/chunk_buffer_memory_manager.h b/be/src/exec/chunk_buffer_memory_manager.h index 8c8dde506f2a7..2936c43dbcd05 100644 --- a/be/src/exec/chunk_buffer_memory_manager.h +++ b/be/src/exec/chunk_buffer_memory_manager.h @@ -41,8 +41,17 @@ class ChunkBufferMemoryManager { } void update_memory_usage(int64_t memory_usage, int64_t num_rows) { - _memory_usage += memory_usage; - _buffered_num_rows += num_rows; + bool prev_full = is_full(); + size_t prev_memusage = _memory_usage.fetch_add(memory_usage); + size_t prev_num_rows = _buffered_num_rows.fetch_add(num_rows); + bool is_full = + prev_memusage + memory_usage >= _max_memory_usage || prev_num_rows + num_rows >= _max_buffered_rows; + bool expect = false; + bool full_changed = prev_full != is_full; + if (!full_changed) { + return; + } + _full_events_changed.compare_exchange_strong(expect, full_changed); } size_t get_memory_limit_per_driver() const { return _max_memory_usage_per_driver; } @@ -65,6 +74,14 @@ class ChunkBufferMemoryManager { _buffered_num_rows = 0; } + bool full_events_changed() { + if (!_full_events_changed.load(std::memory_order_acquire)) { + return false; + } + bool val = true; + return _full_events_changed.compare_exchange_strong(val, false); + } + private: std::atomic _max_memory_usage{128UL * 1024 * 1024 * 1024}; // 128GB size_t _max_memory_usage_per_driver = 128 * 1024 * 1024UL; // 128MB @@ -72,5 +89,6 @@ class ChunkBufferMemoryManager { std::atomic _memory_usage{}; std::atomic _buffered_num_rows{}; size_t _max_input_dop; + std::atomic _full_events_changed{}; }; } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.cpp b/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.cpp index b93db5090ab03..01a4c928e0700 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.cpp +++ b/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.cpp @@ -33,6 +33,7 @@ Status AggregateBlockingSinkOperator::prepare(RuntimeState* state) { _aggregator->limit() != -1 && // has limit _aggregator->conjunct_ctxs().empty() && // no 'having' clause _aggregator->get_aggr_phase() == AggrPhase2); // phase 2, keep it to make things safe + _aggregator->attach_sink_observer(state, this->_observer); return Status::OK(); } @@ -46,6 +47,7 @@ void AggregateBlockingSinkOperator::close(RuntimeState* state) { Status AggregateBlockingSinkOperator::set_finishing(RuntimeState* state) { if (_is_finished) return Status::OK(); ONCE_DETECT(_set_finishing_once); + auto notify = _aggregator->defer_notify_source(); auto defer = DeferOp([this]() { COUNTER_UPDATE(_aggregator->input_row_count(), _aggregator->num_input_rows()); _aggregator->sink_complete(); diff --git a/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.h b/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.h index e15c1ad74defa..623e152e456bd 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.h +++ b/be/src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.h @@ -75,6 +75,8 @@ class AggregateBlockingSinkOperatorFactory final : public OperatorFactory { ~AggregateBlockingSinkOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + Status prepare(RuntimeState* state) override; OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override; diff --git a/be/src/exec/pipeline/aggregate/aggregate_blocking_source_operator.cpp b/be/src/exec/pipeline/aggregate/aggregate_blocking_source_operator.cpp index 20b98d4e88212..eaafb29d001df 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_blocking_source_operator.cpp +++ b/be/src/exec/pipeline/aggregate/aggregate_blocking_source_operator.cpp @@ -29,6 +29,7 @@ bool AggregateBlockingSourceOperator::is_finished() const { } Status AggregateBlockingSourceOperator::set_finished(RuntimeState* state) { + auto notify = _aggregator->defer_notify_sink(); return _aggregator->set_finished(); } @@ -37,6 +38,12 @@ void AggregateBlockingSourceOperator::close(RuntimeState* state) { SourceOperator::close(state); } +Status AggregateBlockingSourceOperator::prepare(RuntimeState* state) { + RETURN_IF_ERROR(SourceOperator::prepare(state)); + _aggregator->attach_source_observer(state, this->_observer); + return Status::OK(); +} + StatusOr AggregateBlockingSourceOperator::pull_chunk(RuntimeState* state) { RETURN_IF_CANCELLED(state); diff --git a/be/src/exec/pipeline/aggregate/aggregate_blocking_source_operator.h b/be/src/exec/pipeline/aggregate/aggregate_blocking_source_operator.h index b537ca71999d1..e12af22804f8c 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_blocking_source_operator.h +++ b/be/src/exec/pipeline/aggregate/aggregate_blocking_source_operator.h @@ -34,6 +34,7 @@ class AggregateBlockingSourceOperator : public SourceOperator { bool has_output() const override; bool is_finished() const override; + Status prepare(RuntimeState* state) override; Status set_finished(RuntimeState* state) override; diff --git a/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_sink_operator.cpp b/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_sink_operator.cpp index 6b1495df2b7cb..314dabe6ef2ae 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_sink_operator.cpp +++ b/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_sink_operator.cpp @@ -22,7 +22,9 @@ namespace starrocks::pipeline { Status AggregateDistinctBlockingSinkOperator::prepare(RuntimeState* state) { RETURN_IF_ERROR(Operator::prepare(state)); RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), _unique_metrics.get())); - return _aggregator->open(state); + RETURN_IF_ERROR(_aggregator->open(state)); + _aggregator->attach_sink_observer(state, this->_observer); + return Status::OK(); } void AggregateDistinctBlockingSinkOperator::close(RuntimeState* state) { @@ -35,6 +37,7 @@ void AggregateDistinctBlockingSinkOperator::close(RuntimeState* state) { Status AggregateDistinctBlockingSinkOperator::set_finishing(RuntimeState* state) { if (_is_finished) return Status::OK(); ONCE_DETECT(_set_finishing_once); + auto notify = _aggregator->defer_notify_source(); auto defer = DeferOp([this]() { COUNTER_UPDATE(_aggregator->input_row_count(), _aggregator->num_input_rows()); _aggregator->sink_complete(); diff --git a/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_sink_operator.h b/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_sink_operator.h index 62066986b2631..4a56f1694ed2f 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_sink_operator.h +++ b/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_sink_operator.h @@ -73,6 +73,8 @@ class AggregateDistinctBlockingSinkOperatorFactory final : public OperatorFactor ~AggregateDistinctBlockingSinkOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + Status prepare(RuntimeState* state) override { RETURN_IF_ERROR(OperatorFactory::prepare(state)); return Status::OK(); diff --git a/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_source_operator.cpp b/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_source_operator.cpp index dec7115d9d037..b679f0e8213d4 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_source_operator.cpp +++ b/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_source_operator.cpp @@ -35,6 +35,12 @@ void AggregateDistinctBlockingSourceOperator::close(RuntimeState* state) { SourceOperator::close(state); } +Status AggregateDistinctBlockingSourceOperator::prepare(RuntimeState* state) { + RETURN_IF_ERROR(SourceOperator::prepare(state)); + _aggregator->attach_source_observer(state, this->_observer); + return Status::OK(); +} + StatusOr AggregateDistinctBlockingSourceOperator::pull_chunk(RuntimeState* state) { RETURN_IF_CANCELLED(state); diff --git a/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_source_operator.h b/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_source_operator.h index f2fe6179a1cc2..299cdadda70be 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_source_operator.h +++ b/be/src/exec/pipeline/aggregate/aggregate_distinct_blocking_source_operator.h @@ -34,6 +34,7 @@ class AggregateDistinctBlockingSourceOperator : public SourceOperator { bool has_output() const override; bool is_finished() const override; + Status prepare(RuntimeState* state) override; Status set_finished(RuntimeState* state) override; diff --git a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.cpp b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.cpp index ec3eeec18219d..3fb70ea847664 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.cpp +++ b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.cpp @@ -26,6 +26,7 @@ Status AggregateDistinctStreamingSinkOperator::prepare(RuntimeState* state) { if (_aggregator->streaming_preaggregation_mode() == TStreamingPreaggregationMode::LIMITED_MEM) { _limited_mem_state.limited_memory_size = config::streaming_agg_limited_memory_size; } + _aggregator->attach_sink_observer(state, this->_observer); return _aggregator->open(state); } @@ -37,6 +38,7 @@ void AggregateDistinctStreamingSinkOperator::close(RuntimeState* state) { } Status AggregateDistinctStreamingSinkOperator::set_finishing(RuntimeState* state) { + auto notify = _aggregator->defer_notify_source(); _is_finished = true; // skip processing if cancelled diff --git a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.h b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.h index ee9f4026d0a34..f839509dee664 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.h +++ b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_sink_operator.h @@ -83,6 +83,8 @@ class AggregateDistinctStreamingSinkOperatorFactory final : public OperatorFacto ~AggregateDistinctStreamingSinkOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { return std::make_shared( this, _id, _plan_node_id, driver_sequence, _aggregator_factory->get_or_create(driver_sequence)); diff --git a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_source_operator.cpp b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_source_operator.cpp index caacfab9b8fdf..9d7f7d2394383 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_source_operator.cpp +++ b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_source_operator.cpp @@ -14,8 +14,18 @@ #include "aggregate_distinct_streaming_source_operator.h" +#include "common/status.h" +#include "exec/pipeline/source_operator.h" +#include "runtime/runtime_state.h" + namespace starrocks::pipeline { +Status AggregateDistinctStreamingSourceOperator::prepare(RuntimeState* state) { + RETURN_IF_ERROR(SourceOperator::prepare(state)); + _aggregator->attach_source_observer(state, this->_observer); + return Status::OK(); +} + bool AggregateDistinctStreamingSourceOperator::has_output() const { // There are two cases where chunk buffer is not null // case1:streaming mode is 'FORCE_STREAMING' @@ -46,6 +56,7 @@ bool AggregateDistinctStreamingSourceOperator::is_finished() const { } Status AggregateDistinctStreamingSourceOperator::set_finished(RuntimeState* state) { + auto notify = _aggregator->defer_notify_sink(); return _aggregator->set_finished(); } diff --git a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_source_operator.h b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_source_operator.h index 1219a84558a09..01cfdd6c18173 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_source_operator.h +++ b/be/src/exec/pipeline/aggregate/aggregate_distinct_streaming_source_operator.h @@ -31,6 +31,7 @@ class AggregateDistinctStreamingSourceOperator : public SourceOperator { ~AggregateDistinctStreamingSourceOperator() override = default; + Status prepare(RuntimeState* state) override; bool has_output() const override; bool is_finished() const override; @@ -60,6 +61,8 @@ class AggregateDistinctStreamingSourceOperatorFactory final : public SourceOpera ~AggregateDistinctStreamingSourceOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { return std::make_shared( this, _id, _plan_node_id, driver_sequence, _aggregator_factory->get_or_create(driver_sequence)); diff --git a/be/src/exec/pipeline/aggregate/aggregate_streaming_sink_operator.cpp b/be/src/exec/pipeline/aggregate/aggregate_streaming_sink_operator.cpp index d3aa8195ce77b..bfc34bced820c 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_streaming_sink_operator.cpp +++ b/be/src/exec/pipeline/aggregate/aggregate_streaming_sink_operator.cpp @@ -29,7 +29,9 @@ Status AggregateStreamingSinkOperator::prepare(RuntimeState* state) { if (_aggregator->streaming_preaggregation_mode() == TStreamingPreaggregationMode::LIMITED_MEM) { _limited_mem_state.limited_memory_size = config::streaming_agg_limited_memory_size; } - return _aggregator->open(state); + RETURN_IF_ERROR(_aggregator->open(state)); + _aggregator->attach_sink_observer(state, this->_observer); + return Status::OK(); } void AggregateStreamingSinkOperator::close(RuntimeState* state) { @@ -40,8 +42,8 @@ void AggregateStreamingSinkOperator::close(RuntimeState* state) { } Status AggregateStreamingSinkOperator::set_finishing(RuntimeState* state) { + auto notify = _aggregator->defer_notify_source(); _is_finished = true; - // skip processing if cancelled if (state->is_cancelled()) { return Status::OK(); @@ -314,4 +316,12 @@ Status AggregateStreamingSinkOperator::reset_state(RuntimeState* state, const st _is_finished = false; return _aggregator->reset_state(state, refill_chunks, this); } + +std::string AggregateStreamingSinkOperator::get_name() const { + std::string finished = is_finished() ? "X" : "O"; + auto full = _aggregator->is_chunk_buffer_full(); + return fmt::format("{}_{}_{}({}) {{ full:{} has_output:{}}}", _name, _plan_node_id, (void*)this, finished, full, + has_output()); +} + } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/aggregate/aggregate_streaming_sink_operator.h b/be/src/exec/pipeline/aggregate/aggregate_streaming_sink_operator.h index e8ff38113fb27..2eba42c087b43 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_streaming_sink_operator.h +++ b/be/src/exec/pipeline/aggregate/aggregate_streaming_sink_operator.h @@ -49,6 +49,8 @@ class AggregateStreamingSinkOperator : public Operator { bool releaseable() const override { return true; } void set_execute_mode(int performance_level) override; + std::string get_name() const override; + private: // Invoked by push_chunk if current mode is TStreamingPreaggregationMode::FORCE_STREAMING Status _push_chunk_by_force_streaming(const ChunkPtr& chunk); @@ -85,6 +87,8 @@ class AggregateStreamingSinkOperatorFactory final : public OperatorFactory { ~AggregateStreamingSinkOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { return std::make_shared(this, _id, _plan_node_id, driver_sequence, _aggregator_factory->get_or_create(driver_sequence)); diff --git a/be/src/exec/pipeline/aggregate/aggregate_streaming_source_operator.cpp b/be/src/exec/pipeline/aggregate/aggregate_streaming_source_operator.cpp index 7259624f781cc..f0b026eee8ae6 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_streaming_source_operator.cpp +++ b/be/src/exec/pipeline/aggregate/aggregate_streaming_source_operator.cpp @@ -53,9 +53,16 @@ bool AggregateStreamingSourceOperator::is_finished() const { } Status AggregateStreamingSourceOperator::set_finished(RuntimeState* state) { + auto notify = _aggregator->defer_notify_sink(); return _aggregator->set_finished(); } +Status AggregateStreamingSourceOperator::prepare(RuntimeState* state) { + RETURN_IF_ERROR(SourceOperator::prepare(state)); + _aggregator->attach_source_observer(state, this->_observer); + return Status::OK(); +} + void AggregateStreamingSourceOperator::close(RuntimeState* state) { _aggregator->unref(state); SourceOperator::close(state); @@ -97,6 +104,7 @@ Status AggregateStreamingSourceOperator::_output_chunk_from_hash_map(ChunkPtr* c } }); + // TODO: notify sink here if (need_reset_aggregator) { if (!_aggregator->is_sink_complete()) { RETURN_IF_ERROR(_aggregator->reset_state(state, {}, nullptr, false)); diff --git a/be/src/exec/pipeline/aggregate/aggregate_streaming_source_operator.h b/be/src/exec/pipeline/aggregate/aggregate_streaming_source_operator.h index 5940e5eed036b..e624ec20d27ce 100644 --- a/be/src/exec/pipeline/aggregate/aggregate_streaming_source_operator.h +++ b/be/src/exec/pipeline/aggregate/aggregate_streaming_source_operator.h @@ -33,6 +33,7 @@ class AggregateStreamingSourceOperator : public SourceOperator { bool has_output() const override; bool is_finished() const override; + Status prepare(RuntimeState* state) override; Status set_finished(RuntimeState* state) override; void close(RuntimeState* state) override; @@ -58,6 +59,8 @@ class AggregateStreamingSourceOperatorFactory final : public SourceOperatorFacto ~AggregateStreamingSourceOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { return std::make_shared(this, _id, _plan_node_id, driver_sequence, _aggregator_factory->get_or_create(driver_sequence)); diff --git a/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h b/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h index 87788318ba6f2..35af114f4c68b 100644 --- a/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h +++ b/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h @@ -110,6 +110,8 @@ class SpillableAggregateBlockingSinkOperatorFactory : public OperatorFactory { OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override; + bool support_event_scheduler() const override { return false; } + private: ObjectPool _pool; SortExecExprs _sort_exprs; diff --git a/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_source_operator.h b/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_source_operator.h index 6c2e061ae9ac4..2e9f690bf8d1b 100644 --- a/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_source_operator.h +++ b/be/src/exec/pipeline/aggregate/spillable_aggregate_blocking_source_operator.h @@ -68,6 +68,8 @@ class SpillableAggregateBlockingSourceOperatorFactory final : public SourceOpera OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override; + bool support_event_scheduler() const override { return false; } + private: AggregatorFactoryPtr _hash_aggregator_factory; // _stream_aggregatory_factory is only used when spilling happens diff --git a/be/src/exec/pipeline/aggregate/spillable_aggregate_distinct_blocking_operator.h b/be/src/exec/pipeline/aggregate/spillable_aggregate_distinct_blocking_operator.h index 8981e25ed5068..99b4e120a60a7 100644 --- a/be/src/exec/pipeline/aggregate/spillable_aggregate_distinct_blocking_operator.h +++ b/be/src/exec/pipeline/aggregate/spillable_aggregate_distinct_blocking_operator.h @@ -88,6 +88,8 @@ class SpillableAggregateDistinctBlockingSinkOperatorFactory final : public Opera OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override; + bool support_event_scheduler() const override { return false; } + private: ObjectPool _pool; SortExecExprs _sort_exprs; @@ -144,6 +146,8 @@ class SpillableAggregateDistinctBlockingSourceOperatorFactory final : public Sou OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override; + bool support_event_scheduler() const override { return false; } + private: AggregatorFactoryPtr _hash_aggregator_factory; // _stream_aggregatory_factory is only used when spilling happens diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp index cf53f164d2000..b4fd49eb39b70 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp @@ -455,7 +455,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { _shuffle_channel_ids.resize(state->chunk_size()); _row_indexes.resize(state->chunk_size()); - + _buffer->attach_observer(state, observer()); return Status::OK(); } diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.h b/be/src/exec/pipeline/exchange/exchange_sink_operator.h index 59885e4e0e2f9..f873f34f0af21 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.h +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.h @@ -224,6 +224,8 @@ class ExchangeSinkOperatorFactory final : public OperatorFactory { ~ExchangeSinkOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override; Status prepare(RuntimeState* state) override; diff --git a/be/src/exec/pipeline/exchange/exchange_source_operator.cpp b/be/src/exec/pipeline/exchange/exchange_source_operator.cpp index 1031b904d1bf3..66f7c42da7e69 100644 --- a/be/src/exec/pipeline/exchange/exchange_source_operator.cpp +++ b/be/src/exec/pipeline/exchange/exchange_source_operator.cpp @@ -26,6 +26,8 @@ Status ExchangeSourceOperator::prepare(RuntimeState* state) { RETURN_IF_ERROR(SourceOperator::prepare(state)); _stream_recvr = static_cast(_factory)->create_stream_recvr(state); _stream_recvr->bind_profile(_driver_sequence, _unique_metrics); + _stream_recvr->attach_observer(state, observer()); + _stream_recvr->attach_query_ctx(state->query_ctx()); return Status::OK(); } @@ -52,6 +54,11 @@ StatusOr ExchangeSourceOperator::pull_chunk(RuntimeState* state) { return std::move(chunk); } +std::string ExchangeSourceOperator::get_name() const { + std::string finished = is_finished() ? "X" : "O"; + return fmt::format("{}_{}_{}({}) {{ has_output:{}}}", _name, _plan_node_id, (void*)this, finished, has_output()); +} + ExchangeSourceOperatorFactory::~ExchangeSourceOperatorFactory() { if (_stream_recvr != nullptr && _stream_recvr_cnt != 0) { // NOTE: it is possible that the ExchangeSourceOperator::prepare() is called, but the ExchangeSourceOperator::set_finishing() diff --git a/be/src/exec/pipeline/exchange/exchange_source_operator.h b/be/src/exec/pipeline/exchange/exchange_source_operator.h index e3cb6bd7c0fab..d06086dacee0a 100644 --- a/be/src/exec/pipeline/exchange/exchange_source_operator.h +++ b/be/src/exec/pipeline/exchange/exchange_source_operator.h @@ -39,6 +39,8 @@ class ExchangeSourceOperator : public SourceOperator { StatusOr pull_chunk(RuntimeState* state) override; + std::string get_name() const override; + private: std::shared_ptr _stream_recvr = nullptr; std::atomic _is_finishing = false; @@ -54,7 +56,9 @@ class ExchangeSourceOperatorFactory final : public SourceOperatorFactory { _row_desc(row_desc), _enable_pipeline_level_shuffle(enable_pipeline_level_shuffle) {} - virtual ~ExchangeSourceOperatorFactory(); + ~ExchangeSourceOperatorFactory() override; + + bool support_event_scheduler() const override { return true; } const TExchangeNode& texchange_node() { return _texchange_node; } diff --git a/be/src/exec/pipeline/exchange/local_exchange.h b/be/src/exec/pipeline/exchange/local_exchange.h index 819367dbfb19c..f2644f4500771 100644 --- a/be/src/exec/pipeline/exchange/local_exchange.h +++ b/be/src/exec/pipeline/exchange/local_exchange.h @@ -111,7 +111,9 @@ class LocalExchanger { public: explicit LocalExchanger(std::string name, std::shared_ptr memory_manager, LocalExchangeSourceOperatorFactory* source) - : _name(std::move(name)), _memory_manager(std::move(memory_manager)), _source(source) {} + : _name(std::move(name)), _memory_manager(std::move(memory_manager)), _source(source) { + source->set_exchanger(this); + } virtual ~LocalExchanger() = default; @@ -131,14 +133,9 @@ class LocalExchanger { } // All LocalExchangeSourceOperators have finished. - virtual bool is_all_sources_finished() const { - for (const auto& source_op : _source->get_sources()) { - if (!source_op->is_finished()) { - return false; - } - } - return true; - } + bool is_all_sources_finished() const { return _finished_source_number == _source->get_sources().size(); } + + void finish_source() { _finished_source_number++; } void epoch_finish(RuntimeState* state) { if (incr_epoch_finished_sinker() == _sink_number) { @@ -163,14 +160,30 @@ class LocalExchanger { size_t get_memory_usage() const { return _memory_manager->get_memory_usage(); } + void attach_sink_observer(RuntimeState* state, pipeline::PipelineObserver* observer) { + _sink_observable.add_observer(state, observer); + } + + auto defer_notify_sink() { + return DeferOp([this]() { + if (_memory_manager->full_events_changed() || is_all_sources_finished()) { + _sink_observable.notify_sink_observers(); + } + }); + } + protected: const std::string _name; std::shared_ptr _memory_manager; std::atomic _sink_number = 0; + std::atomic _finished_source_number = 0; LocalExchangeSourceOperatorFactory* _source; // Stream MV std::atomic _epoch_finished_sinker = 0; + +private: + Observable _sink_observable; }; // Exchange the local data for shuffle diff --git a/be/src/exec/pipeline/exchange/local_exchange_sink_operator.cpp b/be/src/exec/pipeline/exchange/local_exchange_sink_operator.cpp index 7b8219be654e5..fac6af7ea53c1 100644 --- a/be/src/exec/pipeline/exchange/local_exchange_sink_operator.cpp +++ b/be/src/exec/pipeline/exchange/local_exchange_sink_operator.cpp @@ -16,6 +16,7 @@ #include "column/chunk.h" #include "runtime/runtime_state.h" +#include "util/defer_op.h" namespace starrocks::pipeline { @@ -27,6 +28,7 @@ Status LocalExchangeSinkOperator::prepare(RuntimeState* state) { _peak_memory_usage_counter = _unique_metrics->AddHighWaterMarkCounter( "LocalExchangePeakMemoryUsage", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES, TCounterMergeType::SKIP_FIRST_MERGE)); + _exchanger->attach_sink_observer(state, this->observer()); return Status::OK(); } @@ -50,6 +52,11 @@ Status LocalExchangeSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr return res; } +std::string LocalExchangeSinkOperator::get_name() const { + std::string finished = is_finished() ? "X" : "O"; + return fmt::format("{}_{}_{}({}) {{ need_input:{}}}", _name, _plan_node_id, (void*)this, finished, need_input()); +} + /// LocalExchangeSinkOperatorFactory. Status LocalExchangeSinkOperatorFactory::prepare(RuntimeState* state) { RETURN_IF_ERROR(OperatorFactory::prepare(state)); diff --git a/be/src/exec/pipeline/exchange/local_exchange_sink_operator.h b/be/src/exec/pipeline/exchange/local_exchange_sink_operator.h index 16cd382920ed9..af54bd3f657b1 100644 --- a/be/src/exec/pipeline/exchange/local_exchange_sink_operator.h +++ b/be/src/exec/pipeline/exchange/local_exchange_sink_operator.h @@ -64,6 +64,8 @@ class LocalExchangeSinkOperator final : public Operator { void update_exec_stats(RuntimeState* state) override {} + std::string get_name() const override; + private: bool _is_finished = false; const std::shared_ptr& _exchanger; diff --git a/be/src/exec/pipeline/exchange/local_exchange_source_operator.cpp b/be/src/exec/pipeline/exchange/local_exchange_source_operator.cpp index 8eed2e8dc749f..9d52998ae2f59 100644 --- a/be/src/exec/pipeline/exchange/local_exchange_source_operator.cpp +++ b/be/src/exec/pipeline/exchange/local_exchange_source_operator.cpp @@ -15,6 +15,7 @@ #include "exec/pipeline/exchange/local_exchange_source_operator.h" #include "column/chunk.h" +#include "exec/pipeline/exchange/local_exchange.h" #include "runtime/runtime_state.h" namespace starrocks::pipeline { @@ -22,6 +23,7 @@ namespace starrocks::pipeline { // Used for PassthroughExchanger. // The input chunk is most likely full, so we don't merge it to avoid copying chunk data. void LocalExchangeSourceOperator::add_chunk(ChunkPtr chunk) { + auto notify = defer_notify(); std::lock_guard l(_chunk_lock); if (_is_finished) { return; @@ -37,6 +39,7 @@ void LocalExchangeSourceOperator::add_chunk(ChunkPtr chunk) { // Only enqueue the partition chunk information here, and merge chunk in pull_chunk(). Status LocalExchangeSourceOperator::add_chunk(ChunkPtr chunk, const std::shared_ptr>& indexes, uint32_t from, uint32_t size, size_t memory_usage) { + auto notify = defer_notify(); std::lock_guard l(_chunk_lock); if (_is_finished) { return Status::OK(); @@ -55,6 +58,7 @@ Status LocalExchangeSourceOperator::add_chunk(ChunkPtr chunk, const std::shared_ Status LocalExchangeSourceOperator::add_chunk(const std::vector& partition_key, std::unique_ptr chunk) { + auto notify = defer_notify(); std::lock_guard l(_chunk_lock); if (_is_finished) { return Status::OK(); @@ -89,18 +93,25 @@ bool LocalExchangeSourceOperator::has_output() const { std::lock_guard l(_chunk_lock); return !_full_chunk_queue.empty() || _partition_rows_num >= _factory->runtime_state()->chunk_size() || - _key_partition_max_rows() > 0 || (_is_finished && _partition_rows_num > 0) || _local_buffer_almost_full(); + _key_partition_max_rows() > 0 || ((_is_finished || _local_buffer_almost_full()) && _partition_rows_num > 0); } Status LocalExchangeSourceOperator::set_finished(RuntimeState* state) { + auto* exchanger = down_cast(_factory)->exchanger(); + exchanger->finish_source(); + // notify local-exchange sink + // notify-condition 1. mem-buffer full 2. all finished + auto notify = exchanger->defer_notify_sink(); std::lock_guard l(_chunk_lock); _is_finished = true; - // clear _full_chunk_queue - { [[maybe_unused]] typeof(_full_chunk_queue) tmp = std::move(_full_chunk_queue); } - // clear _partition_chunk_queue - { [[maybe_unused]] typeof(_partition_chunk_queue) tmp = std::move(_partition_chunk_queue); } - // clear _key_partition_pending_chunks - { [[maybe_unused]] typeof(_partition_key2partial_chunks) tmp = std::move(_partition_key2partial_chunks); } + { + // clear _full_chunk_queue + _full_chunk_queue = {}; + // clear _partition_chunk_queue + _partition_chunk_queue = {}; + // clear _key_partition_pending_chunks + _partition_key2partial_chunks = std::unordered_map, PartialChunks>{}; + } // Subtract the number of rows of buffered chunks from row_count of _memory_manager and make it unblocked. _memory_manager->update_memory_usage(-_local_memory_usage, -_partition_rows_num); _partition_rows_num = 0; @@ -109,6 +120,9 @@ Status LocalExchangeSourceOperator::set_finished(RuntimeState* state) { } StatusOr LocalExchangeSourceOperator::pull_chunk(RuntimeState* state) { + // notify sink + auto* exchanger = down_cast(_factory)->exchanger(); + auto notify = exchanger->defer_notify_sink(); ChunkPtr chunk = _pull_passthrough_chunk(state); if (chunk == nullptr && _key_partition_pending_chunk_empty()) { chunk = _pull_shuffle_chunk(state); @@ -118,6 +132,11 @@ StatusOr LocalExchangeSourceOperator::pull_chunk(RuntimeState* state) return std::move(chunk); } +std::string LocalExchangeSourceOperator::get_name() const { + std::string finished = is_finished() ? "X" : "O"; + return fmt::format("{}_{}_{}({}) {{ has_output:{}}}", _name, _plan_node_id, (void*)this, finished, has_output()); +} + const size_t min_local_memory_limit = 1LL * 1024 * 1024; void LocalExchangeSourceOperator::enter_release_memory_mode() { diff --git a/be/src/exec/pipeline/exchange/local_exchange_source_operator.h b/be/src/exec/pipeline/exchange/local_exchange_source_operator.h index 9fd020b5e38a4..fb2b62be4857a 100644 --- a/be/src/exec/pipeline/exchange/local_exchange_source_operator.h +++ b/be/src/exec/pipeline/exchange/local_exchange_source_operator.h @@ -22,7 +22,7 @@ #include "exec/pipeline/source_operator.h" namespace starrocks::pipeline { - +class LocalExchanger; class LocalExchangeSourceOperator final : public SourceOperator { class PartitionChunk { public: @@ -72,6 +72,7 @@ class LocalExchangeSourceOperator final : public SourceOperator { Status set_finished(RuntimeState* state) override; Status set_finishing(RuntimeState* state) override { + auto notify = defer_notify(); std::lock_guard l(_chunk_lock); _is_finished = true; return Status::OK(); @@ -99,6 +100,8 @@ class LocalExchangeSourceOperator final : public SourceOperator { void set_execute_mode(int performance_level) override; void update_exec_stats(RuntimeState* state) override {} + std::string get_name() const override; + private: ChunkPtr _pull_passthrough_chunk(RuntimeState* state); @@ -153,9 +156,13 @@ class LocalExchangeSourceOperatorFactory final : public SourceOperatorFactory { return source; } + void set_exchanger(LocalExchanger* exchanger) { _exchanger = exchanger; } + LocalExchanger* exchanger() { return _exchanger; } + std::vector& get_sources() { return _sources; } private: + LocalExchanger* _exchanger = nullptr; std::shared_ptr _memory_manager; std::vector _sources; }; diff --git a/be/src/exec/pipeline/exchange/sink_buffer.cpp b/be/src/exec/pipeline/exchange/sink_buffer.cpp index 4be8a271e3cd4..ba5b9b4e51149 100644 --- a/be/src/exec/pipeline/exchange/sink_buffer.cpp +++ b/be/src/exec/pipeline/exchange/sink_buffer.cpp @@ -20,6 +20,7 @@ #include #include +#include "exec/pipeline/schedule/utils.h" #include "fmt/core.h" #include "util/defer_op.h" #include "util/time.h" @@ -209,6 +210,7 @@ int64_t SinkBuffer::_network_time() { } void SinkBuffer::cancel_one_sinker(RuntimeState* const state) { + auto notify = this->defer_notify(); if (--_num_uncancelled_sinkers == 0) { _is_finishing = true; } @@ -307,6 +309,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun return; } if (--_num_remaining_eos == 0) { + auto notify = this->defer_notify(); _is_finishing = true; } sink_ctx(instance_id.lo).num_sinker--; @@ -350,6 +353,10 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun } closure->addFailedHandler([this](const ClosureContext& ctx, std::string_view rpc_error_msg) noexcept { + auto query_ctx = _fragment_ctx->runtime_state()->query_ctx(); + auto query_ctx_guard = query_ctx->shared_from_this(); + auto notify = this->defer_notify(); + auto defer = DeferOp([this]() { --_total_in_flight_rpc; }); _is_finishing = true; auto& context = sink_ctx(ctx.instance_id.lo); @@ -365,7 +372,10 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun LOG(WARNING) << err_msg; }); closure->addSuccessHandler([this](const ClosureContext& ctx, const PTransmitChunkResult& result) noexcept { - // when _total_in_flight_rpc desc to 0, _fragment_ctx may be destructed + auto query_ctx = _fragment_ctx->runtime_state()->query_ctx(); + auto query_ctx_guard = query_ctx->shared_from_this(); + auto notify = this->defer_notify(); + auto defer = DeferOp([this]() { --_total_in_flight_rpc; }); Status status(result.status()); auto& context = sink_ctx(ctx.instance_id.lo); diff --git a/be/src/exec/pipeline/exchange/sink_buffer.h b/be/src/exec/pipeline/exchange/sink_buffer.h index f7787c6141578..12c767423ace0 100644 --- a/be/src/exec/pipeline/exchange/sink_buffer.h +++ b/be/src/exec/pipeline/exchange/sink_buffer.h @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -78,7 +79,6 @@ struct TimeTrace { } }; -// TODO(hcf) how to export brpc error class SinkBuffer { public: SinkBuffer(FragmentContext* fragment_ctx, const std::vector& destinations, @@ -100,6 +100,17 @@ class SinkBuffer { void incr_sinker(RuntimeState* state); + void attach_observer(RuntimeState* state, PipelineObserver* observer) { _observable.add_observer(state, observer); } + void notify_observers() { _observable.notify_sink_observers(); } + auto defer_notify() { + return DeferOp([this]() { + _observable.notify_sink_observers(); + if (bthread_self()) { + CHECK(tls_thread_status.mem_tracker() == GlobalEnv::GetInstance()->process_mem_tracker()); + } + }); + } + private: using Mutex = bthread::Mutex; @@ -203,6 +214,8 @@ class SinkBuffer { std::atomic _request_sequence = 0; int64_t _sent_audit_stats_frequency = 1; int64_t _sent_audit_stats_frequency_upper_limit = 64; + + Observable _observable; }; } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/fragment_context.cpp b/be/src/exec/pipeline/fragment_context.cpp index 74564dcd10cd8..b75cf6911ad9a 100644 --- a/be/src/exec/pipeline/fragment_context.cpp +++ b/be/src/exec/pipeline/fragment_context.cpp @@ -19,6 +19,8 @@ #include "exec/data_sink.h" #include "exec/pipeline/group_execution/execution_group.h" #include "exec/pipeline/pipeline_driver_executor.h" +#include "exec/pipeline/schedule/pipeline_timer.h" +#include "exec/pipeline/schedule/timeout_tasks.h" #include "exec/pipeline/stream_pipeline_driver.h" #include "exec/workgroup/work_group.h" #include "runtime/batch_write/batch_write_mgr.h" @@ -30,6 +32,7 @@ #include "util/threadpool.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" +#include "util/uid_util.h" namespace starrocks::pipeline { @@ -43,6 +46,7 @@ FragmentContext::~FragmentContext() { if (_plan != nullptr) { _plan->close(_runtime_state.get()); } + clear_pipeline_timer(); } size_t FragmentContext::total_dop() const { @@ -75,6 +79,9 @@ void FragmentContext::count_down_execution_group(size_t val) { if (!all_groups_finished) { return; } + // close fragment context states + // clear all pipeline timers + clear_pipeline_timer(); // dump profile if necessary auto* state = runtime_state(); @@ -200,6 +207,14 @@ void FragmentContext::set_final_status(const Status& status) { iterate_drivers([executor](const DriverPtr& driver) { executor->cancel(driver.get()); }); } + // cancel drivers in event scheduler + iterate_drivers([](const DriverPtr& driver) { + driver->set_need_check_reschedule(true); + if (driver->is_in_blocked()) { + driver->observer()->cancel_trigger(); + } + }); + for (const auto& stream_load_context : _stream_load_contexts) { if (stream_load_context->body_sink) { stream_load_context->body_sink->cancel(_s_status); @@ -228,6 +243,7 @@ void FragmentContext::set_stream_load_contexts(const std::vectorquery_ctx() != nullptr) { _runtime_state->query_ctx()->release_workgroup_token_once(); @@ -325,6 +341,33 @@ void FragmentContext::destroy_pass_through_chunk_buffer() { } } +Status FragmentContext::set_pipeline_timer(PipelineTimer* timer) { + _pipeline_timer = timer; + _timeout_task = new CheckFragmentTimeout(this); + timespec tm = butil::microseconds_to_timespec(butil::gettimeofday_us()); + tm.tv_sec += runtime_state()->query_ctx()->get_query_expire_seconds(); + RETURN_IF_ERROR(_pipeline_timer->schedule(_timeout_task, tm)); + return Status::OK(); +} + +void FragmentContext::clear_pipeline_timer() { + if (_pipeline_timer) { + if (!_rf_timeout_tasks.empty()) { + for (auto& [ignore, task] : _rf_timeout_tasks) { + if (task) { + task->unschedule(_pipeline_timer); + SAFE_DELETE(task); + } + } + _rf_timeout_tasks.clear(); + } + if (_timeout_task) { + _timeout_task->unschedule(_pipeline_timer); + SAFE_DELETE(_timeout_task); + } + } +} + Status FragmentContext::reset_epoch() { _num_finished_epoch_pipelines = 0; const std::function caller = [this](Pipeline* pipeline) { @@ -378,6 +421,7 @@ Status FragmentContext::prepare_active_drivers() { for (auto& group : _execution_groups) { RETURN_IF_ERROR(group->prepare_drivers(_runtime_state.get())); } + RETURN_IF_ERROR(submit_all_timer()); return Status::OK(); } @@ -410,4 +454,25 @@ void FragmentContext::init_event_scheduler() { enable_event_scheduler() ? "true" : "false"); } +void FragmentContext::add_timer_observer(PipelineObserver* observer, uint64_t timeout) { + RFScanWaitTimeout* task; + if (auto iter = _rf_timeout_tasks.find(timeout); iter != _rf_timeout_tasks.end()) { + task = down_cast(iter->second); + } else { + task = new RFScanWaitTimeout(this); + _rf_timeout_tasks.emplace(timeout, task); + } + task->add_observer(_runtime_state.get(), observer); +} + +Status FragmentContext::submit_all_timer() { + timespec tm = butil::microseconds_to_timespec(butil::gettimeofday_us()); + for (auto [delta_ns, task] : _rf_timeout_tasks) { + timespec abstime = tm; + abstime.tv_nsec += delta_ns; + RETURN_IF_ERROR(_pipeline_timer->schedule(task, abstime)); + } + return Status::OK(); +} + } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/fragment_context.h b/be/src/exec/pipeline/fragment_context.h index 7d7bd261fdb20..3637b68dd4c9f 100644 --- a/be/src/exec/pipeline/fragment_context.h +++ b/be/src/exec/pipeline/fragment_context.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include "exec/exec_node.h" @@ -26,6 +27,8 @@ #include "exec/pipeline/runtime_filter_types.h" #include "exec/pipeline/scan/morsel.h" #include "exec/pipeline/schedule/event_scheduler.h" +#include "exec/pipeline/schedule/observer.h" +#include "exec/pipeline/schedule/pipeline_timer.h" #include "exec/query_cache/cache_param.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService.h" @@ -119,6 +122,8 @@ class FragmentContext { void destroy_pass_through_chunk_buffer(); void set_driver_token(DriverLimiter::TokenPtr driver_token) { _driver_token = std::move(driver_token); } + Status set_pipeline_timer(PipelineTimer* pipeline_timer); + void clear_pipeline_timer(); query_cache::CacheParam& cache_param() { return _cache_param; } @@ -179,6 +184,10 @@ class FragmentContext { EventScheduler* event_scheduler() const { return _event_scheduler.get(); } void init_event_scheduler(); + PipelineTimer* pipeline_timer() { return _pipeline_timer; } + void add_timer_observer(PipelineObserver* observer, uint64_t timeout); + Status submit_all_timer(); + private: void _close_stream_load_contexts(); @@ -206,6 +215,12 @@ class FragmentContext { ExecutionGroups _execution_groups; std::atomic _num_finished_execution_groups = 0; + std::unique_ptr _event_scheduler; + PipelineTimer* _pipeline_timer = nullptr; + PipelineTimerTask* _timeout_task = nullptr; + PipelineTimerTask* _report_state_task = nullptr; + std::unordered_map _rf_timeout_tasks; + RuntimeFilterHub _runtime_filter_hub; MorselQueueFactoryMap _morsel_queue_factories; @@ -240,8 +255,6 @@ class FragmentContext { RuntimeProfile::Counter* _jit_timer = nullptr; bool _report_when_finish{}; - - std::unique_ptr _event_scheduler; }; class FragmentContextManager { diff --git a/be/src/exec/pipeline/fragment_executor.cpp b/be/src/exec/pipeline/fragment_executor.cpp index 2ec32a53a042c..c5cf587181f2b 100644 --- a/be/src/exec/pipeline/fragment_executor.cpp +++ b/be/src/exec/pipeline/fragment_executor.cpp @@ -16,6 +16,7 @@ #include #include +#include #include "common/config.h" #include "exec/capture_version_node.h" @@ -29,6 +30,7 @@ #include "exec/pipeline/fragment_context.h" #include "exec/pipeline/pipeline_builder.h" #include "exec/pipeline/pipeline_driver_executor.h" +#include "exec/pipeline/pipeline_fwd.h" #include "exec/pipeline/result_sink_operator.h" #include "exec/pipeline/scan/connector_scan_operator.h" #include "exec/pipeline/scan/morsel.h" @@ -748,8 +750,26 @@ Status FragmentExecutor::_prepare_pipeline_driver(ExecEnv* exec_env, const Unifi tsink, fragment.output_exprs)); } _fragment_ctx->set_data_sink(std::move(datasink)); - auto group_with_pipelines = builder.build(); - _fragment_ctx->set_pipelines(std::move(group_with_pipelines.first), std::move(group_with_pipelines.second)); + auto [exec_groups, pipelines] = builder.build(); + _fragment_ctx->set_pipelines(std::move(exec_groups), std::move(pipelines)); + + if (runtime_state->query_options().__isset.enable_pipeline_event_scheduler && + runtime_state->query_options().enable_pipeline_event_scheduler) { + // check all pipeline in fragment support event scheduler + bool all_support_event_scheduler = true; + _fragment_ctx->iterate_pipeline([&all_support_event_scheduler](Pipeline* pipeline) { + auto* src = pipeline->source_operator_factory(); + auto* sink = pipeline->sink_operator_factory(); + all_support_event_scheduler = all_support_event_scheduler && src->support_event_scheduler(); + all_support_event_scheduler = all_support_event_scheduler && sink->support_event_scheduler(); + }); + + if (all_support_event_scheduler) { + _fragment_ctx->init_event_scheduler(); + RETURN_IF_ERROR(_fragment_ctx->set_pipeline_timer(exec_env->pipeline_timer())); + } + } + runtime_state->set_enable_event_scheduler(_fragment_ctx->enable_event_scheduler()); RETURN_IF_ERROR(_fragment_ctx->prepare_all_pipelines()); @@ -960,6 +980,8 @@ Status FragmentExecutor::append_incremental_scan_ranges(ExecEnv* exec_env, const if (fragment_ctx == nullptr) return Status::OK(); RuntimeState* runtime_state = fragment_ctx->runtime_state(); + std::unordered_set notify_ids; + for (const auto& [node_id, scan_ranges] : params.per_node_scan_ranges) { if (scan_ranges.size() == 0) continue; auto iter = fragment_ctx->morsel_queue_factories().find(node_id); @@ -977,6 +999,7 @@ Status FragmentExecutor::append_incremental_scan_ranges(ExecEnv* exec_env, const pipeline::ScanMorsel::build_scan_morsels(node_id, scan_ranges, true, &morsels, &has_more_morsel); RETURN_IF_ERROR(morsel_queue_factory->append_morsels(0, std::move(morsels))); morsel_queue_factory->set_has_more(has_more_morsel); + notify_ids.insert(node_id); } if (params.__isset.node_to_per_driver_seq_scan_ranges) { @@ -1000,9 +1023,17 @@ Status FragmentExecutor::append_incremental_scan_ranges(ExecEnv* exec_env, const RETURN_IF_ERROR(morsel_queue_factory->append_morsels(driver_seq, std::move(morsels))); } morsel_queue_factory->set_has_more(has_more_morsel); + notify_ids.insert(node_id); } } + // notify all source + fragment_ctx->iterate_pipeline([&](Pipeline* pipeline) { + if (notify_ids.contains(pipeline->source_operator_factory()->plan_node_id())) { + pipeline->source_operator_factory()->observes().notify_source_observers(); + } + }); + return Status::OK(); } diff --git a/be/src/exec/pipeline/pipeline.cpp b/be/src/exec/pipeline/pipeline.cpp index 6be6c8dcd83c2..c60b2e9bcef8f 100644 --- a/be/src/exec/pipeline/pipeline.cpp +++ b/be/src/exec/pipeline/pipeline.cpp @@ -83,6 +83,11 @@ void Pipeline::instantiate_drivers(RuntimeState* state) { driver = std::make_shared(std::move(operators), query_ctx, fragment_ctx, this, fragment_ctx->next_driver_id()); } + + if (state->enable_event_scheduler()) { + driver->assign_observer(); + } + setup_drivers_profile(driver); driver->set_workgroup(workgroup); _drivers.emplace_back(std::move(driver)); @@ -117,7 +122,7 @@ void Pipeline::setup_pipeline_profile(RuntimeState* runtime_state) { } void Pipeline::setup_drivers_profile(const DriverPtr& driver) { - runtime_profile()->add_info_string("isGroupExecution", + runtime_profile()->add_info_string("IsGroupExecution", _execution_group->is_colocate_exec_group() ? "true" : "false"); runtime_profile()->add_child(driver->runtime_profile(), true, nullptr); auto* dop_counter = diff --git a/be/src/exec/pipeline/pipeline_driver_executor.cpp b/be/src/exec/pipeline/pipeline_driver_executor.cpp index b4da5f9d68cd8..0b3aa36afd696 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.cpp +++ b/be/src/exec/pipeline/pipeline_driver_executor.cpp @@ -102,6 +102,8 @@ void GlobalDriverExecutor::_worker_thread() { if (driver == nullptr) { continue; } + DCHECK(!driver->is_in_ready()); + DCHECK(!driver->is_in_blocked()); if (current_thread != nullptr) { current_thread->set_idle(false); @@ -257,6 +259,9 @@ StatusOr GlobalDriverExecutor::_get_next_driver(std::queuestart_timers(); + if (driver->fragment_ctx()->enable_event_scheduler()) { + driver->fragment_ctx()->event_scheduler()->attach_queue(_driver_queue.get()); + } if (driver->is_precondition_block()) { driver->set_driver_state(DriverState::PRECONDITION_BLOCK); diff --git a/be/src/exec/pipeline/pipeline_driver_poller.cpp b/be/src/exec/pipeline/pipeline_driver_poller.cpp index abb0aae082cec..3c971bca6f124 100644 --- a/be/src/exec/pipeline/pipeline_driver_poller.cpp +++ b/be/src/exec/pipeline/pipeline_driver_poller.cpp @@ -16,6 +16,7 @@ #include +#include "exec/pipeline/pipeline_fwd.h" #include "util/time_guard.h" namespace starrocks::pipeline { @@ -185,6 +186,12 @@ void PipelineDriverPoller::run_internal() { } void PipelineDriverPoller::add_blocked_driver(const DriverRawPtr driver) { + auto event_scheduler = driver->fragment_ctx()->event_scheduler(); + if (event_scheduler != nullptr) { + event_scheduler->add_blocked_driver(driver); + return; + } + std::unique_lock lock(_global_mutex); _blocked_drivers.push_back(driver); _num_drivers++; diff --git a/be/src/exec/pipeline/pipeline_driver_queue.cpp b/be/src/exec/pipeline/pipeline_driver_queue.cpp index 0caf3f69b3394..2822daf32c972 100644 --- a/be/src/exec/pipeline/pipeline_driver_queue.cpp +++ b/be/src/exec/pipeline/pipeline_driver_queue.cpp @@ -49,6 +49,7 @@ void QuerySharedDriverQueue::put_back(const DriverRawPtr driver) { driver->set_driver_queue_level(level); { std::lock_guard lock(_global_mutex); + DCHECK(!driver->is_in_ready()); _queues[level].put(driver); driver->set_in_ready(true); driver->set_in_queue(this); diff --git a/be/src/exec/pipeline/result_sink_operator.cpp b/be/src/exec/pipeline/result_sink_operator.cpp index fd476e0d4955a..0c37fa9d62d0d 100644 --- a/be/src/exec/pipeline/result_sink_operator.cpp +++ b/be/src/exec/pipeline/result_sink_operator.cpp @@ -36,6 +36,8 @@ Status ResultSinkOperator::prepare(RuntimeState* state) { // Create profile _unique_metrics->add_info_string("SinkType", to_string(_sink_type)); auto profile = _unique_metrics.get(); + _sender->attach_query_ctx(state->query_ctx()->get_shared_ptr()); + _sender->attach_observer(state, observer()); // Create writer based on sink type switch (_sink_type) { diff --git a/be/src/exec/pipeline/result_sink_operator.h b/be/src/exec/pipeline/result_sink_operator.h index fcffd36d5b7b5..3db54d1db3303 100644 --- a/be/src/exec/pipeline/result_sink_operator.h +++ b/be/src/exec/pipeline/result_sink_operator.h @@ -106,6 +106,8 @@ class ResultSinkOperatorFactory final : public OperatorFactory { ~ResultSinkOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { // _num_sinkers is incremented when creating a ResultSinkOperator instance here at the preparing // phase of FragmentExecutor, and decremented and read when closing ResultSinkOperator. The visibility diff --git a/be/src/exec/pipeline/scan/chunk_buffer_limiter.cpp b/be/src/exec/pipeline/scan/chunk_buffer_limiter.cpp index 7f478b72bddcc..832a4c1783ec7 100644 --- a/be/src/exec/pipeline/scan/chunk_buffer_limiter.cpp +++ b/be/src/exec/pipeline/scan/chunk_buffer_limiter.cpp @@ -40,14 +40,17 @@ void DynamicChunkBufferLimiter::update_avg_row_bytes(size_t added_sum_row_bytes, ChunkBufferTokenPtr DynamicChunkBufferLimiter::pin(int num_chunks) { size_t prev_value = _pinned_chunks_counter.fetch_add(num_chunks); if (prev_value + num_chunks > _capacity) { - _unpin(num_chunks); + unpin(num_chunks); return nullptr; } - return std::make_unique(_pinned_chunks_counter, num_chunks); + return std::make_unique(*this, num_chunks); } -void DynamicChunkBufferLimiter::_unpin(int num_chunks) { +void DynamicChunkBufferLimiter::unpin(int num_chunks) { int prev_value = _pinned_chunks_counter.fetch_sub(num_chunks); + if (prev_value >= _capacity && !is_full()) { + _has_full_event = true; + } DCHECK_GE(prev_value, 1); } diff --git a/be/src/exec/pipeline/scan/chunk_buffer_limiter.h b/be/src/exec/pipeline/scan/chunk_buffer_limiter.h index 715006dcca2e5..6d337d7b6cbd4 100644 --- a/be/src/exec/pipeline/scan/chunk_buffer_limiter.h +++ b/be/src/exec/pipeline/scan/chunk_buffer_limiter.h @@ -59,6 +59,7 @@ class ChunkBufferLimiter { virtual size_t default_capacity() const = 0; // Update mem limit of this chunk buffer virtual void update_mem_limit(int64_t value) {} + virtual bool has_full_events() { return false; } }; // The capacity of this limiter is unlimited. @@ -100,15 +101,14 @@ class DynamicChunkBufferLimiter final : public ChunkBufferLimiter { public: class Token final : public ChunkBufferToken { public: - Token(std::atomic& pinned_tokens_counter, int num_tokens) - : _pinned_tokens_counter(pinned_tokens_counter), _num_tokens(num_tokens) {} + Token(DynamicChunkBufferLimiter& limiter, int num_tokens) : _limiter(limiter), _num_tokens(num_tokens) {} - ~Token() override { _pinned_tokens_counter.fetch_sub(_num_tokens); } + ~Token() override { _limiter.unpin(_num_tokens); } DISALLOW_COPY_AND_MOVE(Token); private: - std::atomic& _pinned_tokens_counter; + DynamicChunkBufferLimiter& _limiter; const int _num_tokens; }; @@ -124,16 +124,22 @@ class DynamicChunkBufferLimiter final : public ChunkBufferLimiter { void update_avg_row_bytes(size_t added_sum_row_bytes, size_t added_num_rows, size_t max_chunk_rows) override; ChunkBufferTokenPtr pin(int num_chunks) override; + void unpin(int num_chunks); bool is_full() const override { return _pinned_chunks_counter >= _capacity; } size_t size() const override { return _pinned_chunks_counter; } size_t capacity() const override { return _capacity; } size_t default_capacity() const override { return _default_capacity; } void update_mem_limit(int64_t value) override; + bool has_full_events() override { + if (!_has_full_event.load(std::memory_order_acquire)) { + return false; + } + bool val = true; + return _has_full_event.compare_exchange_strong(val, false); + } private: - void _unpin(int num_chunks); - private: std::mutex _mutex; size_t _sum_row_bytes = 0; @@ -146,6 +152,8 @@ class DynamicChunkBufferLimiter final : public ChunkBufferLimiter { std::atomic _mem_limit; std::atomic _pinned_chunks_counter = 0; + + std::atomic _has_full_event{}; }; } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/scan/chunk_source.cpp b/be/src/exec/pipeline/scan/chunk_source.cpp index beda11721dc7f..15d229b09262f 100644 --- a/be/src/exec/pipeline/scan/chunk_source.cpp +++ b/be/src/exec/pipeline/scan/chunk_source.cpp @@ -61,12 +61,15 @@ Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_ { SCOPED_RAW_TIMER(&time_spent_ns); + // TODO: process when buffer full if (_chunk_token == nullptr && (_chunk_token = _chunk_buffer.limiter()->pin(1)) == nullptr) { break; } ChunkPtr chunk; _status = _read_chunk(state, &chunk); + // notify when generate new chunk + auto notify = scan_defer_notify(_scan_op); // we always output a empty chunk instead of nullptr, because we need set tablet_id and is_last_chunk flag // in the chunk. if (chunk == nullptr) { diff --git a/be/src/exec/pipeline/scan/olap_scan_context.cpp b/be/src/exec/pipeline/scan/olap_scan_context.cpp index bb24b8e23399e..b1a95457e0d44 100644 --- a/be/src/exec/pipeline/scan/olap_scan_context.cpp +++ b/be/src/exec/pipeline/scan/olap_scan_context.cpp @@ -52,14 +52,17 @@ void OlapScanContext::attach_shared_input(int32_t operator_seq, int32_t source_i auto key = std::make_pair(operator_seq, source_index); VLOG_ROW << fmt::format("attach_shared_input ({}, {}), active {}", operator_seq, source_index, _active_inputs.size()); - _active_inputs.emplace(key); + _num_active_inputs += _active_inputs.emplace(key).second; } void OlapScanContext::detach_shared_input(int32_t operator_seq, int32_t source_index) { auto key = std::make_pair(operator_seq, source_index); VLOG_ROW << fmt::format("detach_shared_input ({}, {}), remain {}", operator_seq, source_index, _active_inputs.size()); - _active_inputs.erase(key); + int erased = _active_inputs.erase(key); + if (erased && _num_active_inputs.fetch_sub(1) == 1) { + _active_inputs_empty = true; + } } bool OlapScanContext::has_active_input() const { diff --git a/be/src/exec/pipeline/scan/olap_scan_context.h b/be/src/exec/pipeline/scan/olap_scan_context.h index 9f45f463dfec1..03d38e337f9c9 100644 --- a/be/src/exec/pipeline/scan/olap_scan_context.h +++ b/be/src/exec/pipeline/scan/olap_scan_context.h @@ -21,7 +21,9 @@ #include "column/column_access_path.h" #include "exec/olap_scan_prepare.h" #include "exec/pipeline/context_with_dependency.h" +#include "exec/pipeline/operator.h" #include "exec/pipeline/scan/balanced_chunk_buffer.h" +#include "exec/pipeline/schedule/observer.h" #include "runtime/global_dict/parser.h" #include "storage/rowset/rowset.h" #include "util/phmap/phmap_fwd_decl.h" @@ -124,6 +126,17 @@ class OlapScanContext final : public ContextWithDependency { int64_t get_scan_table_id() const { return _scan_table_id; } + void attach_observer(RuntimeState* state, PipelineObserver* observer) { _observable.add_observer(state, observer); } + void notify_observers() { _observable.notify_source_observers(); } + size_t only_one_observer() const { return _observable.num_observers() == 1; } + bool active_inputs_empty_event() { + if (!_active_inputs_empty.load(std::memory_order_acquire)) { + return false; + } + bool val = true; + return _active_inputs_empty.compare_exchange_strong(val, false); + } + private: OlapScanNode* _scan_node; int64_t _scan_table_id; @@ -142,7 +155,9 @@ class OlapScanContext final : public ContextWithDependency { typename std::allocator, NUM_LOCK_SHARD_LOG, std::mutex, true>; BalancedChunkBuffer& _chunk_buffer; // Shared Chunk buffer for all scan operators, owned by OlapScanContextFactory. ActiveInputSet _active_inputs; // Maintain the active chunksource - bool _shared_scan; // Enable shared_scan + std::atomic_int _num_active_inputs{}; + std::atomic_bool _active_inputs_empty{}; + bool _shared_scan; // Enable shared_scan std::atomic _is_prepare_finished{false}; @@ -153,6 +168,9 @@ class OlapScanContext final : public ContextWithDependency { std::vector _tablets; MultiRowsetReleaseGuard _rowset_release_guard; ConcurrentJitRewriter& _jit_rewriter; + + // the scan operator observe when task finished + Observable _observable; }; // OlapScanContextFactory creates different contexts for each scan operator, if _shared_scan is false. diff --git a/be/src/exec/pipeline/scan/olap_scan_operator.cpp b/be/src/exec/pipeline/scan/olap_scan_operator.cpp index 6670fd6d337ef..aef4d2c448cf6 100644 --- a/be/src/exec/pipeline/scan/olap_scan_operator.cpp +++ b/be/src/exec/pipeline/scan/olap_scan_operator.cpp @@ -18,6 +18,7 @@ #include "exec/olap_scan_node.h" #include "exec/pipeline/scan/olap_chunk_source.h" #include "exec/pipeline/scan/olap_scan_context.h" +#include "fmt/format.h" #include "runtime/current_thread.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -92,9 +93,10 @@ bool OlapScanOperator::is_finished() const { return ScanOperator::is_finished(); } -Status OlapScanOperator::do_prepare(RuntimeState*) { +Status OlapScanOperator::do_prepare(RuntimeState* state) { bool shared_scan = _ctx->is_shared_scan(); _unique_metrics->add_info_string("SharedScan", shared_scan ? "True" : "False"); + _ctx->attach_observer(state, observer()); return Status::OK(); } @@ -126,4 +128,21 @@ BalancedChunkBuffer& OlapScanOperator::get_chunk_buffer() const { return _ctx->get_chunk_buffer(); } +bool OlapScanOperator::need_notify_all() { + return (!_ctx->only_one_observer() && _ctx->active_inputs_empty_event()) || has_full_events(); +} + +std::string OlapScanOperator::get_name() const { + std::string finished = is_finished() ? "X" : "O"; + bool full = is_buffer_full(); + int io_tasks = _num_running_io_tasks; + bool has_active = _ctx->has_active_input(); + std::string morsel_queue_name = _morsel_queue->name(); + bool morsel_queue_empty = _morsel_queue->empty(); + return fmt::format( + "{}_{}_{}({}) {{ full:{} iostasks:{} has_active:{} num_chunks:{} morsel:{} empty:{} has_output:{}}}", _name, + _plan_node_id, (void*)this, finished, full, io_tasks, has_active, num_buffered_chunks(), morsel_queue_name, + morsel_queue_empty, has_output()); +} + } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/scan/olap_scan_operator.h b/be/src/exec/pipeline/scan/olap_scan_operator.h index 1c6724c468568..1ee60e29100c5 100644 --- a/be/src/exec/pipeline/scan/olap_scan_operator.h +++ b/be/src/exec/pipeline/scan/olap_scan_operator.h @@ -43,6 +43,8 @@ class OlapScanOperatorFactory final : public ScanOperatorFactory { TPartitionType::type partition_type() const override { return TPartitionType::BUCKET_SHUFFLE_HASH_PARTITIONED; } const std::vector& partition_exprs() const override; + bool support_event_scheduler() const override { return true; } + private: OlapScanContextFactoryPtr _ctx_factory; }; @@ -62,12 +64,14 @@ class OlapScanOperator final : public ScanOperator { ChunkSourcePtr create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) override; int64_t get_scan_table_id() const override; + std::string get_name() const override; protected: void attach_chunk_source(int32_t source_index) override; void detach_chunk_source(int32_t source_index) override; bool has_shared_chunk_source() const override; BalancedChunkBuffer& get_chunk_buffer() const override; + bool need_notify_all() override; private: OlapScanContextPtr _ctx; diff --git a/be/src/exec/pipeline/scan/olap_scan_prepare_operator.cpp b/be/src/exec/pipeline/scan/olap_scan_prepare_operator.cpp index 5ddaa46d61c4a..649e98a1c6528 100644 --- a/be/src/exec/pipeline/scan/olap_scan_prepare_operator.cpp +++ b/be/src/exec/pipeline/scan/olap_scan_prepare_operator.cpp @@ -85,6 +85,7 @@ StatusOr OlapScanPrepareOperator::pull_chunk(RuntimeState* state) { DeferOp defer([&]() { _ctx->set_prepare_finished(); + _ctx->notify_observers(); TEST_SYNC_POINT("OlapScnPrepareOperator::pull_chunk::after_set_prepare_finished"); }); diff --git a/be/src/exec/pipeline/scan/olap_scan_prepare_operator.h b/be/src/exec/pipeline/scan/olap_scan_prepare_operator.h index 0fb4e337149e8..a4a2d7cd6c86f 100644 --- a/be/src/exec/pipeline/scan/olap_scan_prepare_operator.h +++ b/be/src/exec/pipeline/scan/olap_scan_prepare_operator.h @@ -50,6 +50,8 @@ class OlapScanPrepareOperatorFactory final : public SourceOperatorFactory { OlapScanContextFactoryPtr ctx_factory); ~OlapScanPrepareOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + bool with_morsels() const override { return true; } Status prepare(RuntimeState* state) override; diff --git a/be/src/exec/pipeline/scan/scan_operator.cpp b/be/src/exec/pipeline/scan/scan_operator.cpp index cf79702baafd9..5c62844853296 100644 --- a/be/src/exec/pipeline/scan/scan_operator.cpp +++ b/be/src/exec/pipeline/scan/scan_operator.cpp @@ -23,12 +23,14 @@ #include "exec/pipeline/limit_operator.h" #include "exec/pipeline/pipeline_builder.h" #include "exec/pipeline/scan/connector_scan_operator.h" +#include "exec/pipeline/schedule/common.h" #include "exec/workgroup/scan_executor.h" #include "exec/workgroup/work_group.h" #include "runtime/current_thread.h" #include "runtime/exec_env.h" #include "util/debug/query_trace.h" #include "util/failpoint/fail_point.h" +#include "util/race_detect.h" #include "util/runtime_profile.h" namespace starrocks::pipeline { @@ -172,6 +174,7 @@ bool ScanOperator::has_output() const { // Can pick up more morsels or submit more tasks if (!_morsel_queue->empty()) { + std::shared_lock guard(_task_mutex); auto status_or_is_ready = _morsel_queue->ready_for_next(); if (status_or_is_ready.ok() && status_or_is_ready.value()) { return true; @@ -241,6 +244,7 @@ void ScanOperator::update_exec_stats(RuntimeState* state) { } Status ScanOperator::set_finishing(RuntimeState* state) { + auto notify = scan_defer_notify(this); // check when expired, are there running io tasks or submitted tasks if (UNLIKELY(state != nullptr && state->query_ctx()->is_query_expired() && (_num_running_io_tasks > 0 || _submit_task_counter->value() == 0))) { @@ -259,8 +263,9 @@ Status ScanOperator::set_finishing(RuntimeState* state) { } StatusOr ScanOperator::pull_chunk(RuntimeState* state) { + RACE_DETECT(race_pull_chunk); RETURN_IF_ERROR(_get_scan_status()); - + auto defer = scan_defer_notify(this); _peak_buffer_size_counter->set(buffer_size()); _peak_buffer_memory_usage->set(buffer_memory_usage()); @@ -431,11 +436,11 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) FAIL_POINT_SCOPE(mem_alloc_error); #endif - DeferOp timer_defer([chunk_source]() { COUNTER_SET(chunk_source->scan_timer(), chunk_source->io_task_wait_timer()->value() + chunk_source->io_task_exec_timer()->value()); }); + auto notify = scan_defer_notify(this); COUNTER_UPDATE(chunk_source->io_task_wait_timer(), MonotonicNanos() - io_task_start_nano); SCOPED_TIMER(chunk_source->io_task_exec_timer()); diff --git a/be/src/exec/pipeline/scan/scan_operator.h b/be/src/exec/pipeline/scan/scan_operator.h index dc6a2c8deb8f1..bc2e8907110ae 100644 --- a/be/src/exec/pipeline/scan/scan_operator.h +++ b/be/src/exec/pipeline/scan/scan_operator.h @@ -20,6 +20,7 @@ #include "exec/query_cache/cache_operator.h" #include "exec/query_cache/lane_arbiter.h" #include "exec/workgroup/work_group_fwd.h" +#include "util/race_detect.h" #include "util/spinlock.h" namespace starrocks { @@ -98,6 +99,20 @@ class ScanOperator : public SourceOperator { void update_exec_stats(RuntimeState* state) override; + bool has_full_events() { return get_chunk_buffer().limiter()->has_full_events(); } + virtual bool need_notify_all() { return true; } + + template + auto defer_notify(NotifyAll notify_all) { + return DeferOp([this, notify_all]() { + if (notify_all()) { + _source_factory()->observes().notify_source_observers(); + } else { + _observable.notify_source_observers(); + } + }); + } + protected: static constexpr size_t kIOTaskBatchSize = 64; @@ -218,6 +233,8 @@ class ScanOperator : public SourceOperator { RuntimeProfile::Counter* _prepare_chunk_source_timer = nullptr; RuntimeProfile::Counter* _submit_io_task_timer = nullptr; + + DECLARE_RACE_DETECTOR(race_pull_chunk) }; class ScanOperatorFactory : public SourceOperatorFactory { @@ -248,6 +265,10 @@ class ScanOperatorFactory : public SourceOperatorFactory { std::shared_ptr _scan_task_group; }; +inline auto scan_defer_notify(ScanOperator* scan_op) { + return scan_op->defer_notify([scan_op]() -> bool { return scan_op->need_notify_all(); }); +} + pipeline::OpFactories decompose_scan_node_to_pipeline(std::shared_ptr factory, ScanNode* scan_node, pipeline::PipelineBuilderContext* context); diff --git a/be/src/exec/pipeline/schedule/observer.cpp b/be/src/exec/pipeline/schedule/observer.cpp index a19b9a432ec7f..1d987a5aa71cc 100644 --- a/be/src/exec/pipeline/schedule/observer.cpp +++ b/be/src/exec/pipeline/schedule/observer.cpp @@ -21,10 +21,6 @@ namespace starrocks::pipeline { static void on_update(PipelineDriver* driver) { auto sink = driver->sink_operator(); auto source = driver->source_operator(); - TRACE_SCHEDULE_LOG << "notify driver:" << driver << " state:" << driver->driver_state() - << " in_block_queue:" << driver->is_in_blocked() << " source finished:" << source->is_finished() - << " operator has output:" << source->has_output() << " sink finished:" << sink->is_finished() - << " sink need input:" << sink->need_input() << ":" << driver->to_readable_string(); if (sink->is_finished() || sink->need_input() || source->is_finished() || source->has_output()) { driver->fragment_ctx()->event_scheduler()->try_schedule(driver); } @@ -47,6 +43,12 @@ static void on_source_update(PipelineDriver* driver) { void PipelineObserver::_do_update(int event) { auto driver = _driver; auto token = driver->acquire_schedule_token(); + auto sink = driver->sink_operator(); + auto source = driver->source_operator(); + TRACE_SCHEDULE_LOG << "notify driver:" << driver << " state:" << driver->driver_state() + << " in_block_queue:" << driver->is_in_blocked() << " source finished:" << source->is_finished() + << " operator has output:" << source->has_output() << " sink finished:" << sink->is_finished() + << " sink need input:" << sink->need_input() << ":" << driver->to_readable_string(); if (driver->is_in_blocked()) { // In PRECONDITION state, has_output need_input may return false. In this case, diff --git a/be/src/exec/pipeline/schedule/timeout_tasks.cpp b/be/src/exec/pipeline/schedule/timeout_tasks.cpp new file mode 100644 index 0000000000000..37025bcae81d9 --- /dev/null +++ b/be/src/exec/pipeline/schedule/timeout_tasks.cpp @@ -0,0 +1,42 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/pipeline/schedule/timeout_tasks.h" + +#include "exec/pipeline/fragment_context.h" +#include "exec/pipeline/pipeline_fwd.h" +#include "exec/pipeline/schedule/common.h" +#include "util/stack_util.h" + +namespace starrocks::pipeline { +void CheckFragmentTimeout::Run() { + auto query_ctx = _fragment_ctx->runtime_state()->query_ctx(); + size_t expire_seconds = query_ctx->get_query_expire_seconds(); + TRACE_SCHEDULE_LOG << "fragment_instance_id:" << print_id(_fragment_ctx->fragment_instance_id()); + _fragment_ctx->cancel(Status::TimedOut(fmt::format("Query exceeded time limit of {} seconds", expire_seconds))); + + _fragment_ctx->iterate_drivers([](const DriverPtr& driver) { + driver->set_need_check_reschedule(true); + if (driver->is_in_blocked()) { + LOG(WARNING) << "[Driver] Timeout " << driver->to_readable_string(); + driver->observer()->cancel_trigger(); + } + }); +} + +void RFScanWaitTimeout::Run() { + _timeout.notify_source_observers(); +} + +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/schedule/timeout_tasks.h b/be/src/exec/pipeline/schedule/timeout_tasks.h new file mode 100644 index 0000000000000..b6b5ef2ad5369 --- /dev/null +++ b/be/src/exec/pipeline/schedule/timeout_tasks.h @@ -0,0 +1,46 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "exec/pipeline/schedule/observer.h" +#include "exec/pipeline/schedule/pipeline_timer.h" +#include "runtime/descriptors.h" + +namespace starrocks::pipeline { +class FragmentContext; + +// TimerTask object, fragment->cancel is called if the timeout is reached. +class CheckFragmentTimeout final : public PipelineTimerTask { +public: + CheckFragmentTimeout(FragmentContext* fragment_ctx) : _fragment_ctx(fragment_ctx) {} + void Run() override; + +private: + FragmentContext* _fragment_ctx; +}; + +// If the timeout is reached, a cancel_update event is sent to all objects observing _timeout. +class RFScanWaitTimeout final : public PipelineTimerTask { +public: + RFScanWaitTimeout(FragmentContext* fragment_ctx) : _fragment_ctx(fragment_ctx) {} + void add_observer(RuntimeState* state, PipelineObserver* observer) { _timeout.add_observer(state, observer); } + void Run() override; + +private: + FragmentContext* _fragment_ctx; + Observable _timeout; +}; + +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/sink/blackhole_table_sink_operator.h b/be/src/exec/pipeline/sink/blackhole_table_sink_operator.h index 38ed6cb6b4537..cfd36a5ac438d 100644 --- a/be/src/exec/pipeline/sink/blackhole_table_sink_operator.h +++ b/be/src/exec/pipeline/sink/blackhole_table_sink_operator.h @@ -52,6 +52,8 @@ class BlackHoleTableSinkOperatorFactory final : public OperatorFactory { ~BlackHoleTableSinkOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { return std::make_shared(this, _id, _plan_node_id, driver_sequence); } diff --git a/be/src/exec/pipeline/source_operator.h b/be/src/exec/pipeline/source_operator.h index a977cf0d6098d..0efd0fb88026b 100644 --- a/be/src/exec/pipeline/source_operator.h +++ b/be/src/exec/pipeline/source_operator.h @@ -19,7 +19,9 @@ #include "exec/pipeline/adaptive/adaptive_fwd.h" #include "exec/pipeline/operator.h" #include "exec/pipeline/scan/chunk_source.h" +#include "exec/pipeline/schedule/observer.h" #include "exec/workgroup/work_group_fwd.h" +#include "runtime/descriptors.h" namespace starrocks { @@ -115,6 +117,9 @@ class SourceOperatorFactory : public OperatorFactory { SourceOperatorFactory* group_leader() const; void union_group(SourceOperatorFactory* other_group); + const Observable& observes() const { return _sources_observes; } + Observable& observes() { return _sources_observes; } + protected: size_t _degree_of_parallelism = 1; bool _could_local_shuffle = true; @@ -129,6 +134,8 @@ class SourceOperatorFactory : public OperatorFactory { std::vector _group_dependent_pipelines; EventPtr _group_initialize_event = nullptr; EventPtr _adaptive_blocking_event = nullptr; + + Observable _sources_observes; }; class SourceOperator : public Operator { @@ -145,6 +152,13 @@ class SourceOperator : public Operator { // which will lead to drastic performance deduction (the "ScheduleTime" in profile will be super high). virtual bool is_mutable() const { return false; } + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Operator::prepare(state)); + _observable.add_observer(state, _observer); + _source_factory()->observes().add_observer(state, _observer); + return Status::OK(); + } + Status push_chunk(RuntimeState* state, const ChunkPtr& chunk) override { return Status::InternalError("Shouldn't push chunk to source operator"); } @@ -157,10 +171,18 @@ class SourceOperator : public Operator { return _source_factory()->group_dependent_pipelines(); } + // Donot call notify in any lock scope + auto defer_notify() { + return DeferOp([this]() { _observable.notify_source_observers(); }); + } + protected: const SourceOperatorFactory* _source_factory() const { return down_cast(_factory); } + SourceOperatorFactory* _source_factory() { return down_cast(_factory); } MorselQueue* _morsel_queue = nullptr; + + Observable _observable; }; } // namespace pipeline diff --git a/be/src/exec/pipeline/stream_epoch_manager.h b/be/src/exec/pipeline/stream_epoch_manager.h index 3dbea07f25ee3..4156414eefb71 100644 --- a/be/src/exec/pipeline/stream_epoch_manager.h +++ b/be/src/exec/pipeline/stream_epoch_manager.h @@ -23,8 +23,7 @@ class TMVStartEpochTask; } // namespace starrocks namespace starrocks::pipeline { - -using FragmentContext = pipeline::FragmentContext; +class FragmentContext; using TabletId2BinlogOffset = std::unordered_map; using NodeId2ScanRanges = std::unordered_map; diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index a757049863bcf..562b74336ee97 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -249,6 +249,7 @@ void BufferControlBlock::cancel_pending_rpc() { // seems no use? Status BufferControlBlock::get_batch(TFetchDataResult* result) { + auto notify = defer_notify(); std::unique_ptr ser = nullptr; { std::unique_lock l(_lock); @@ -297,6 +298,7 @@ Status BufferControlBlock::get_batch(TFetchDataResult* result) { } void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { + auto notify = defer_notify(); std::unique_lock l(_lock); if (!_status.ok()) { ctx->on_failure(_status); diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 78c6e7dc030f8..57b9a322b9fea 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -44,7 +44,11 @@ #include "common/status.h" #include "common/statusor.h" +#include "exec/pipeline/schedule/observer.h" #include "gen_cpp/Types_types.h" +#include "runtime/current_thread.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" #include "runtime/query_statistics.h" #include "util/race_detect.h" #include "util/runtime_profile.h" @@ -135,6 +139,25 @@ class BufferControlBlock { } } + void attach_observer(RuntimeState* state, pipeline::PipelineObserver* observer) { + _observable.add_observer(state, observer); + } + + auto defer_notify() { + return DeferOp([query_ctx = _query_ctx, this]() { + if (auto ctx = query_ctx.lock()) { + this->_observable.notify_source_observers(); + CHECK(tls_thread_status.mem_tracker() == GlobalEnv::GetInstance()->process_mem_tracker()); + } + }); + } + + void attach_query_ctx(const std::shared_ptr& query_ctx) { + if (_query_ctx.use_count() == 0) { + _query_ctx = query_ctx; + } + } + private: void _process_batch_without_lock(std::unique_ptr& result); @@ -175,6 +198,9 @@ class BufferControlBlock { // multithreaded access. std::shared_ptr _query_statistics; static const size_t _max_memory_usage = 1UL << 28; // 256MB + + std::weak_ptr _query_ctx; + pipeline::Observable _observable; }; } // namespace starrocks diff --git a/be/src/runtime/data_stream_mgr.cpp b/be/src/runtime/data_stream_mgr.cpp index 0f4d075f610ae..659b897b3d1db 100644 --- a/be/src/runtime/data_stream_mgr.cpp +++ b/be/src/runtime/data_stream_mgr.cpp @@ -37,6 +37,7 @@ #include #include +#include "gen_cpp/internal_service.pb.h" #include "glog/logging.h" #include "runtime/current_thread.h" #include "runtime/data_stream_recvr.h" diff --git a/be/src/runtime/data_stream_mgr.h b/be/src/runtime/data_stream_mgr.h index 9f032285922c6..840724374212d 100644 --- a/be/src/runtime/data_stream_mgr.h +++ b/be/src/runtime/data_stream_mgr.h @@ -46,7 +46,6 @@ #include "common/object_pool.h" #include "common/status.h" #include "gen_cpp/Types_types.h" // for TUniqueId -#include "gen_cpp/internal_service.pb.h" #include "runtime/descriptors.h" // for PlanNodeId #include "runtime/local_pass_through_buffer.h" #include "runtime/mem_tracker.h" diff --git a/be/src/runtime/data_stream_recvr.cpp b/be/src/runtime/data_stream_recvr.cpp index 38a8ca7dc73f1..bb31ff3e453f5 100644 --- a/be/src/runtime/data_stream_recvr.cpp +++ b/be/src/runtime/data_stream_recvr.cpp @@ -41,8 +41,10 @@ #include #include "column/chunk.h" +#include "exec/pipeline/query_context.h" #include "exec/sort_exec_exprs.h" #include "gen_cpp/data.pb.h" +#include "gen_cpp/internal_service.pb.h" #include "runtime/chunk_cursor.h" #include "runtime/current_thread.h" #include "runtime/data_stream_mgr.h" @@ -57,11 +59,6 @@ #include "util/phmap/phmap.h" #include "util/runtime_profile.h" -using std::list; -using std::vector; -using std::pair; -using std::make_pair; - namespace starrocks { Status DataStreamRecvr::create_merger(RuntimeState* state, RuntimeProfile* profile, const SortExecExprs* exprs, @@ -216,6 +213,12 @@ void DataStreamRecvr::bind_profile(int32_t driver_sequence, const std::shared_pt "PeakBufferMemoryBytes", TUnit::BYTES, RuntimeProfile::Counter::create_strategy(TUnit::BYTES)); } +void DataStreamRecvr::attach_query_ctx(pipeline::QueryContext* query_ctx) { + if (_query_ctx.use_count() == 0) { + _query_ctx = query_ctx->get_shared_ptr(); + } +} + Status DataStreamRecvr::get_next(ChunkPtr* chunk, bool* eos) { DCHECK(_chunks_merger.get() != nullptr); return _chunks_merger->get_next(chunk, eos); @@ -239,7 +242,12 @@ bool DataStreamRecvr::is_data_ready() { Status DataStreamRecvr::add_chunks(const PTransmitChunkParams& request, ::google::protobuf::Closure** done) { MemTracker* prev_tracker = tls_thread_status.set_mem_tracker(_instance_mem_tracker.get()); - DeferOp op([&] { tls_thread_status.set_mem_tracker(prev_tracker); }); + DeferOp op([&] { + tls_thread_status.set_mem_tracker(prev_tracker); + DCHECK(prev_tracker == nullptr || prev_tracker == GlobalEnv::GetInstance()->process_mem_tracker()); + }); + // TODO: We just need to notify the affected channels. + auto notify = this->defer_notify(); auto& metrics = get_metrics_round_robin(); SCOPED_TIMER(metrics.process_total_timer); @@ -256,17 +264,20 @@ Status DataStreamRecvr::add_chunks(const PTransmitChunkParams& request, ::google } void DataStreamRecvr::remove_sender(int sender_id, int be_number) { + auto notify = this->defer_notify(); int use_sender_id = _is_merging ? sender_id : 0; _sender_queues[use_sender_id]->decrement_senders(be_number); } void DataStreamRecvr::cancel_stream() { + auto notify = this->defer_notify(); for (auto& _sender_queue : _sender_queues) { _sender_queue->cancel(); } } void DataStreamRecvr::close() { + auto notify = this->defer_notify(); if (_closed) { return; } @@ -300,6 +311,7 @@ Status DataStreamRecvr::get_chunk(std::unique_ptr* chunk) { } Status DataStreamRecvr::get_chunk_for_pipeline(std::unique_ptr* chunk, const int32_t driver_sequence) { + // TODO: notify here DCHECK(!_is_merging); DCHECK_EQ(_sender_queues.size(), 1); Chunk* tmp_chunk = nullptr; @@ -309,6 +321,7 @@ Status DataStreamRecvr::get_chunk_for_pipeline(std::unique_ptr* chunk, co } void DataStreamRecvr::short_circuit_for_pipeline(const int32_t driver_sequence) { + auto notify = this->defer_notify(); DCHECK(_is_pipeline); auto* sender_queue = static_cast(_sender_queues[0]); return sender_queue->short_circuit(driver_sequence); diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index 8172b789b2391..9e1b5fd1c4250 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -39,11 +39,14 @@ #include "column/vectorized_fwd.h" #include "common/object_pool.h" #include "common/status.h" +#include "exec/pipeline/pipeline_fwd.h" +#include "exec/pipeline/schedule/observer.h" #include "exec/sorting/merge_path.h" #include "gen_cpp/Types_types.h" // for TUniqueId #include "runtime/descriptors.h" #include "runtime/local_pass_through_buffer.h" #include "runtime/query_statistics.h" +#include "util/defer_op.h" #include "util/runtime_profile.h" namespace google::protobuf { @@ -132,6 +135,18 @@ class DataStreamRecvr { bool get_encode_level() const { return _encode_level; } + void attach_query_ctx(pipeline::QueryContext* query_ctx); + void attach_observer(RuntimeState* state, pipeline::PipelineObserver* observer) { + _observable.add_observer(state, observer); + } + auto defer_notify() { + return DeferOp([query_ctx = _query_ctx, this]() { + if (auto ctx = query_ctx.lock()) { + this->_observable.notify_source_observers(); + } + }); + } + private: friend class DataStreamMgr; class SenderQueue; @@ -234,6 +249,12 @@ class DataStreamRecvr { // concurrency of the brpc threads, so we let the size of _metrics to be the same as the pipeline's dop, // and use round-robin to choose the metrics for each brpc thread. std::vector _metrics; + + // used in event scheduler + // Capture shared_ptr to avoid use-after-free. + std::weak_ptr _query_ctx; + pipeline::Observable _observable; + std::atomic _rpc_round_roubin_index = 0; // Sub plan query statistics receiver. diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index b3c1f2774560e..c0a4dc1269023 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -47,6 +47,7 @@ #include "exec/pipeline/driver_limiter.h" #include "exec/pipeline/pipeline_driver_executor.h" #include "exec/pipeline/query_context.h" +#include "exec/pipeline/schedule/pipeline_timer.h" #include "exec/spill/dir_manager.h" #include "exec/workgroup/pipeline_executor_set.h" #include "exec/workgroup/scan_executor.h" @@ -422,6 +423,9 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { return (driver_limiter == nullptr) ? 0 : driver_limiter->num_total_drivers(); }); + _pipeline_timer = new pipeline::PipelineTimer(); + RETURN_IF_ERROR(_pipeline_timer->start()); + const int num_io_threads = config::pipeline_scan_thread_pool_thread_num <= 0 ? CpuInfo::num_cores() : config::pipeline_scan_thread_pool_thread_num; @@ -734,6 +738,7 @@ void ExecEnv::destroy() { // _query_pool_mem_tracker. SAFE_DELETE(_runtime_filter_cache); SAFE_DELETE(_driver_limiter); + SAFE_DELETE(_pipeline_timer); SAFE_DELETE(_broker_client_cache); SAFE_DELETE(_frontend_client_cache); SAFE_DELETE(_backend_client_cache); diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4ff0acd1f0d5c..5cc0360f3f6c1 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -39,6 +39,8 @@ #include #include "common/status.h" +#include "exec/pipeline/pipeline_fwd.h" +#include "exec/pipeline/schedule/pipeline_timer.h" #include "exec/query_cache/cache_manager.h" #include "exec/workgroup/work_group_fwd.h" #include "runtime/base_load_path_mgr.h" @@ -92,6 +94,7 @@ namespace pipeline { class DriverExecutor; class QueryContextManager; class DriverLimiter; +class PipelineTimer; } // namespace pipeline namespace lake { @@ -318,6 +321,7 @@ class ExecEnv { pipeline::QueryContextManager* query_context_mgr() { return _query_context_mgr; } pipeline::DriverLimiter* driver_limiter() { return _driver_limiter; } + pipeline::PipelineTimer* pipeline_timer() const { return _pipeline_timer; } int64_t max_executor_threads() const { return _max_executor_threads; } @@ -376,6 +380,7 @@ class ExecEnv { pipeline::QueryContextManager* _query_context_mgr = nullptr; std::unique_ptr _workgroup_manager; pipeline::DriverLimiter* _driver_limiter = nullptr; + pipeline::PipelineTimer* _pipeline_timer = nullptr; int64_t _max_executor_threads = 0; // Max thread number of executor BaseLoadPathMgr* _load_path_mgr = nullptr; diff --git a/be/src/runtime/sender_queue.cpp b/be/src/runtime/sender_queue.cpp index 78c3d15a5919e..769cc71e4fa1d 100644 --- a/be/src/runtime/sender_queue.cpp +++ b/be/src/runtime/sender_queue.cpp @@ -561,6 +561,7 @@ void DataStreamRecvr::PipelineSenderQueue::close() { void DataStreamRecvr::PipelineSenderQueue::clean_buffer_queues() { std::lock_guard l(_lock); + tls_thread_status.set_mem_tracker(nullptr); auto& metrics = _recvr->_metrics[0]; for (size_t i = 0; i < _chunk_queues.size(); i++) { auto& chunk_queue = _chunk_queues[i]; @@ -814,6 +815,7 @@ Status DataStreamRecvr::PipelineSenderQueue::add_chunks(const PTransmitChunkPara _total_chunks++; // Double check here for short circuit compatibility without introducing a critical section if (_chunk_queue_states[index].is_short_circuited.load(std::memory_order_relaxed)) { + tls_thread_status.set_mem_tracker(nullptr); short_circuit(index); // We cannot early-return for short circuit, it may occur for parts of parallelism, // and the other parallelism may need to proceed. diff --git a/test/sql/test_pipeline/R/test_event_scheduler b/test/sql/test_pipeline/R/test_event_scheduler new file mode 100644 index 0000000000000..2cae27d35f770 --- /dev/null +++ b/test/sql/test_pipeline/R/test_event_scheduler @@ -0,0 +1,101 @@ +-- name: test_event_scheduler +set enable_pipeline_event_scheduler=true; +-- result: +-- !result +set enable_group_execution=false; +-- result: +-- !result +set enable_per_bucket_optimize=false; +-- result: +-- !result +CREATE TABLE `t0` ( + `c0` int DEFAULT NULL, + `c1` bigint DEFAULT NULL, + `c2` string DEFAULT NULL +) ENGINE=OLAP +DUPLICATE KEY(`c0`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0`) BUCKETS 1 +PROPERTIES ( +"replication_num" = "1" +); +-- result: +-- !result +set pipeline_dop = 1; +-- result: +-- !result +set io_tasks_per_scan_operator=1; +-- result: +-- !result +insert into t0 SELECT generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 4)); +-- result: +-- !result +select * from t0; +-- result: +1 1 1 +2 2 2 +3 3 3 +4 4 4 +-- !result +insert into t0 SELECT generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 409600)); +-- result: +-- !result +select c0 from t0 where c0 is null; +-- result: +-- !result +select count(c1) from t0; +-- result: +409604 +-- !result +CREATE TABLE `t1` ( + `c0` int DEFAULT NULL, + `c1` bigint DEFAULT NULL, + `c2` string DEFAULT NULL +) ENGINE=OLAP +DUPLICATE KEY(`c0`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0`) BUCKETS 4 +PROPERTIES ( +"replication_num" = "1" +); +-- result: +-- !result +set io_tasks_per_scan_operator=4; +-- result: +-- !result +insert into t1 SELECT generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 409600)); +-- result: +-- !result +select count(c1) from t1; +-- result: +409600 +-- !result +insert into blackhole() select distinct c1,c2,c0 from t1; +-- result: +-- !result +insert into blackhole() select c2,sum(c0),c1 from t1 group by c1,c2; +-- result: +-- !result +set pipeline_dop=2; +-- result: +-- !result +select count(c1) from t1; +-- result: +409600 +-- !result +select count(*) from (select * from t1 limit 1000) t; +-- result: +1000 +-- !result +set streaming_preaggregation_mode="force_streaming"; +-- result: +-- !result +insert into blackhole() select distinct c1,c2,c0 from t1; +-- result: +-- !result +set streaming_preaggregation_mode="force_streaming"; +-- result: +-- !result +insert into blackhole() select sum(c0),c1,c2 from t1 group by c1, c2; +-- result: +-- !result \ No newline at end of file diff --git a/test/sql/test_pipeline/T/test_event_scheduler b/test/sql/test_pipeline/T/test_event_scheduler new file mode 100644 index 0000000000000..425e0e232fe06 --- /dev/null +++ b/test/sql/test_pipeline/T/test_event_scheduler @@ -0,0 +1,65 @@ +-- name: test_event_scheduler +set enable_pipeline_event_scheduler=true; +set enable_group_execution=false; +set enable_per_bucket_optimize=false; + +-- basic scan test: +CREATE TABLE `t0` ( + `c0` int DEFAULT NULL, + `c1` bigint DEFAULT NULL, + `c2` string DEFAULT NULL +) ENGINE=OLAP +DUPLICATE KEY(`c0`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0`) BUCKETS 1 +PROPERTIES ( +"replication_num" = "1" +); +-- +set pipeline_dop = 1; +set io_tasks_per_scan_operator=1; + +insert into t0 SELECT generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 4)); +select * from t0; +insert into t0 SELECT generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 409600)); +-- scan has output always false +select c0 from t0 where c0 is null; + +-- simple agg without local-exchange once stage agg +select count(c1) from t0; + +-- multi-scan tasks +CREATE TABLE `t1` ( + `c0` int DEFAULT NULL, + `c1` bigint DEFAULT NULL, + `c2` string DEFAULT NULL +) ENGINE=OLAP +DUPLICATE KEY(`c0`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0`) BUCKETS 4 +PROPERTIES ( +"replication_num" = "1" +); +set io_tasks_per_scan_operator=4; +insert into t1 SELECT generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 409600)); +-- simple with local-exchange, two stage block agg +select count(c1) from t1; +-- simple with local-exchange, two stage streaming distinct agg +insert into blackhole() select distinct c1,c2,c0 from t1; +-- simple with local-exchange, two stage streaming agg +insert into blackhole() select c2,sum(c0),c1 from t1 group by c1,c2; + +set pipeline_dop=2; +-- test with shared morsel queue +select count(c1) from t1; + +-- test exchange with limit +select count(*) from (select * from t1 limit 1000) t; + +-- test with force streaming +set streaming_preaggregation_mode="force_streaming"; +insert into blackhole() select distinct c1,c2,c0 from t1; + +-- test with force streaming +set streaming_preaggregation_mode="force_streaming"; +insert into blackhole() select sum(c0),c1,c2 from t1 group by c1, c2;