From ba1ebda578810da1b8eab1b58849d2e234a7fc31 Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 14:06:42 +0800 Subject: [PATCH 01/10] [Feature] support phased scheduler Signed-off-by: stdpain --- be/src/exec/CMakeLists.txt | 4 + be/src/exec/pipeline/fragment_context.cpp | 6 + be/src/exec/pipeline/fragment_context.h | 7 + be/src/exec/pipeline/operator.h | 8 + be/src/exec/pipeline/schedule/common.h | 19 +++ .../pipeline/schedule/event_scheduler.cpp | 87 ++++++++++ .../exec/pipeline/schedule/event_scheduler.h | 42 +++++ be/src/exec/pipeline/schedule/observer.cpp | 79 +++++++++ be/src/exec/pipeline/schedule/observer.h | 154 +++++++++++++++++ .../exec/pipeline/schedule/pipeline_timer.cpp | 74 ++++++++ .../exec/pipeline/schedule/pipeline_timer.h | 79 +++++++++ .../exec/pipeline/schedule/timeout_tasks.cpp | 42 +++++ be/src/exec/pipeline/schedule/timeout_tasks.h | 46 +++++ be/src/exec/pipeline/schedule/utils.h | 42 +++++ be/src/runtime/runtime_state.h | 5 + be/src/util/defer_op.h | 6 +- be/test/CMakeLists.txt | 1 + .../exec/pipeline/schedule/observer_test.cpp | 161 ++++++++++++++++++ .../com/starrocks/qe/SessionVariable.java | 6 + gensrc/thrift/InternalService.thrift | 2 + 20 files changed, 869 insertions(+), 1 deletion(-) create mode 100644 be/src/exec/pipeline/schedule/common.h create mode 100644 be/src/exec/pipeline/schedule/event_scheduler.cpp create mode 100644 be/src/exec/pipeline/schedule/event_scheduler.h create mode 100644 be/src/exec/pipeline/schedule/observer.cpp create mode 100644 be/src/exec/pipeline/schedule/observer.h create mode 100644 be/src/exec/pipeline/schedule/pipeline_timer.cpp create mode 100644 be/src/exec/pipeline/schedule/pipeline_timer.h create mode 100644 be/src/exec/pipeline/schedule/timeout_tasks.cpp create mode 100644 be/src/exec/pipeline/schedule/timeout_tasks.h create mode 100644 be/src/exec/pipeline/schedule/utils.h create mode 100644 be/test/exec/pipeline/schedule/observer_test.cpp diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 3b0ca0fc6da1d..d9ef2cb9cfd5f 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -286,6 +286,10 @@ set(EXEC_FILES pipeline/group_execution/execution_group.cpp pipeline/group_execution/execution_group_builder.cpp pipeline/group_execution/group_operator.cpp + pipeline/schedule/event_scheduler.cpp + pipeline/schedule/observer.cpp + pipeline/schedule/pipeline_timer.cpp + pipeline/schedule/timeout_tasks.cpp workgroup/pipeline_executor_set.cpp workgroup/pipeline_executor_set_manager.cpp workgroup/work_group.cpp diff --git a/be/src/exec/pipeline/fragment_context.cpp b/be/src/exec/pipeline/fragment_context.cpp index a682bd8469f3b..74564dcd10cd8 100644 --- a/be/src/exec/pipeline/fragment_context.cpp +++ b/be/src/exec/pipeline/fragment_context.cpp @@ -404,4 +404,10 @@ void FragmentContext::_close_stream_load_contexts() { } } +void FragmentContext::init_event_scheduler() { + _event_scheduler = std::make_unique(); + runtime_state()->runtime_profile()->add_info_string("EnableEventScheduler", + enable_event_scheduler() ? "true" : "false"); +} + } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/fragment_context.h b/be/src/exec/pipeline/fragment_context.h index 7c8eb079e4cdc..7d7bd261fdb20 100644 --- a/be/src/exec/pipeline/fragment_context.h +++ b/be/src/exec/pipeline/fragment_context.h @@ -25,6 +25,7 @@ #include "exec/pipeline/pipeline_fwd.h" #include "exec/pipeline/runtime_filter_types.h" #include "exec/pipeline/scan/morsel.h" +#include "exec/pipeline/schedule/event_scheduler.h" #include "exec/query_cache/cache_param.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService.h" @@ -174,6 +175,10 @@ class FragmentContext { // acquire runtime filter from cache void acquire_runtime_filters(); + bool enable_event_scheduler() const { return event_scheduler() != nullptr; } + EventScheduler* event_scheduler() const { return _event_scheduler.get(); } + void init_event_scheduler(); + private: void _close_stream_load_contexts(); @@ -235,6 +240,8 @@ class FragmentContext { RuntimeProfile::Counter* _jit_timer = nullptr; bool _report_when_finish{}; + + std::unique_ptr _event_scheduler; }; class FragmentContextManager { diff --git a/be/src/exec/pipeline/operator.h b/be/src/exec/pipeline/operator.h index 87719e367874c..2e92613ac7616 100644 --- a/be/src/exec/pipeline/operator.h +++ b/be/src/exec/pipeline/operator.h @@ -19,6 +19,7 @@ #include "column/vectorized_fwd.h" #include "common/statusor.h" #include "exec/pipeline/runtime_filter_types.h" +#include "exec/pipeline/schedule/observer.h" #include "exec/spill/operator_mem_resource_manager.h" #include "exprs/runtime_filter_bank.h" #include "gutil/strings/substitute.h" @@ -269,6 +270,9 @@ class Operator { virtual void update_exec_stats(RuntimeState* state); + void set_observer(PipelineObserver* observer) { _observer = observer; } + PipelineObserver* observer() const { return _observer; } + protected: OperatorFactory* _factory; const int32_t _id; @@ -323,6 +327,8 @@ class Operator { // such as OlapScanOperator( use separated IO thread to execute the IO task) std::atomic_int64_t _last_growth_cpu_time_ns = 0; + PipelineObserver* _observer = nullptr; + private: void _init_rf_counters(bool init_bloom); void _init_conjuct_counters(); @@ -408,6 +414,8 @@ class OperatorFactory { // try to get runtime filter from cache void acquire_runtime_filter(RuntimeState* state); + virtual bool support_event_scheduler() const { return false; } + protected: void _prepare_runtime_in_filters(RuntimeState* state); void _prepare_runtime_holders(const std::vector& holders, diff --git a/be/src/exec/pipeline/schedule/common.h b/be/src/exec/pipeline/schedule/common.h new file mode 100644 index 0000000000000..71500c1bfaf53 --- /dev/null +++ b/be/src/exec/pipeline/schedule/common.h @@ -0,0 +1,19 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#define TRACE_SCHEDULE_LOG VLOG(10) + +#define SCHEDULE_CHECK CHECK diff --git a/be/src/exec/pipeline/schedule/event_scheduler.cpp b/be/src/exec/pipeline/schedule/event_scheduler.cpp new file mode 100644 index 0000000000000..89ce66e343fe7 --- /dev/null +++ b/be/src/exec/pipeline/schedule/event_scheduler.cpp @@ -0,0 +1,87 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/pipeline/schedule/event_scheduler.h" + +#include "exec/pipeline/pipeline_driver.h" +#include "exec/pipeline/pipeline_driver_queue.h" +#include "exec/pipeline/pipeline_fwd.h" +#include "exec/pipeline/schedule/common.h" +#include "exec/pipeline/schedule/utils.h" + +namespace starrocks::pipeline { + +void EventScheduler::add_blocked_driver(const DriverRawPtr driver) { + // Capture query-context is needed before calling reschedule to avoid UAF + auto query_ctx = driver->fragment_ctx()->runtime_state()->query_ctx()->shared_from_this(); + SCHEDULE_CHECK(!driver->is_in_block_queue()); + driver->set_in_block_queue(true); + TRACE_SCHEDULE_LOG << "TRACE add to block queue:" << driver << "," << driver->to_readable_string(); + auto token = driver->acquire_schedule_token(); + // The driver is ready put to block queue. but is_in_block_queue is false, but the driver is active. + // set this flag to make the block queue should check the driver is active + if (!token.acquired() || driver->need_check_reschedule()) { + driver->observer()->cancel_update(); + } +} + +// For a single driver try_schedule has no concurrency. +void EventScheduler::try_schedule(const DriverRawPtr driver) { + SCHEDULE_CHECK(driver->is_in_block_queue()); + bool add_to_ready_queue = false; + RACE_DETECT(driver->schedule); + + // The logic in the pipeline poller is basically the same. + auto fragment_ctx = driver->fragment_ctx(); + if (fragment_ctx->is_canceled()) { + add_to_ready_queue = on_cancel(driver); + } else if (driver->need_report_exec_state()) { + add_to_ready_queue = true; + } else if (driver->pending_finish()) { + if (!driver->is_still_pending_finish()) { + driver->set_driver_state(fragment_ctx->is_canceled() ? DriverState::CANCELED : DriverState::FINISH); + add_to_ready_queue = true; + } + } else if (driver->is_finished()) { + add_to_ready_queue = true; + } else { + auto status_or_is_not_blocked = driver->is_not_blocked(); + if (!status_or_is_not_blocked.ok()) { + fragment_ctx->cancel(status_or_is_not_blocked.status()); + add_to_ready_queue = on_cancel(driver); + } else if (status_or_is_not_blocked.value()) { + driver->set_driver_state(DriverState::READY); + add_to_ready_queue = true; + } + } + + if (add_to_ready_queue) { + TRACE_SCHEDULE_LOG << "TRACE schedule driver:" << driver << " to ready queue"; + driver->set_need_check_reschedule(false); + driver->set_in_block_queue(false); + _driver_queue->put_back(driver); + } +} + +bool EventScheduler::on_cancel(DriverRawPtr driver) { + driver->cancel_operators(driver->fragment_ctx()->runtime_state()); + if (driver->is_still_pending_finish()) { + driver->set_driver_state(DriverState::PENDING_FINISH); + return false; + } else { + driver->set_driver_state(DriverState::CANCELED); + return true; + } +} +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/schedule/event_scheduler.h b/be/src/exec/pipeline/schedule/event_scheduler.h new file mode 100644 index 0000000000000..e790a82681931 --- /dev/null +++ b/be/src/exec/pipeline/schedule/event_scheduler.h @@ -0,0 +1,42 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "exec/pipeline/pipeline_fwd.h" +#include "gutil/macros.h" + +namespace starrocks::pipeline { +class DriverQueue; +class EventScheduler { +public: + EventScheduler() = default; + DISALLOW_COPY(EventScheduler); + + void add_blocked_driver(const DriverRawPtr driver); + + void try_schedule(const DriverRawPtr driver); + + bool on_cancel(DriverRawPtr driver); + + void attach_queue(DriverQueue* queue) { + if (_driver_queue == nullptr) { + _driver_queue = queue; + } + } + +private: + DriverQueue* _driver_queue = nullptr; +}; +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/schedule/observer.cpp b/be/src/exec/pipeline/schedule/observer.cpp new file mode 100644 index 0000000000000..c20018ee2fa4e --- /dev/null +++ b/be/src/exec/pipeline/schedule/observer.cpp @@ -0,0 +1,79 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/pipeline/schedule/observer.h" + +#include "exec/pipeline/pipeline_driver.h" +#include "exec/pipeline/schedule/common.h" + +namespace starrocks::pipeline { +static void on_update(PipelineDriver* driver) { + auto sink = driver->sink_operator(); + auto source = driver->source_operator(); + TRACE_SCHEDULE_LOG << "notify driver:" << driver << " state:" << driver->driver_state() + << " in_block_queue:" << driver->is_in_block_queue() + << " source finished:" << source->is_finished() + << " operator has output:" << source->has_output() << " sink finished:" << sink->is_finished() + << " sink need input:" << sink->need_input() << ":" << driver->to_readable_string(); + if (sink->is_finished() || sink->need_input() || source->is_finished() || source->has_output()) { + driver->fragment_ctx()->event_scheduler()->try_schedule(driver); + } +} + +static void on_sink_update(PipelineDriver* driver) { + auto sink = driver->sink_operator(); + if (sink->is_finished() || sink->need_input()) { + driver->fragment_ctx()->event_scheduler()->try_schedule(driver); + } +} + +static void on_source_update(PipelineDriver* driver) { + auto source = driver->source_operator(); + if (source->is_finished() || source->has_output()) { + driver->fragment_ctx()->event_scheduler()->try_schedule(driver); + } +} + +void PipelineObserver::_do_update(int event) { + auto driver = _driver; + auto token = driver->acquire_schedule_token(); + + if (driver->is_in_block_queue()) { + bool pipeline_block = driver->driver_state() != DriverState::INPUT_EMPTY || + driver->driver_state() != DriverState::OUTPUT_FULL; + if (pipeline_block || _is_cancel_changed(event)) { + driver->fragment_ctx()->event_scheduler()->try_schedule(driver); + } else if (_is_all_changed(event)) { + on_update(driver); + } else if (_is_source_changed(event)) { + on_source_update(driver); + } else if (_is_sink_changed(event)) { + on_sink_update(driver); + } else { + // nothing to do + } + } else { + driver->set_need_check_reschedule(true); + } +} + +std::string Observable::to_string() const { + std::string str; + for (auto* observer : _observers) { + str += observer->driver()->to_readable_string() + "\n"; + } + return str; +} + +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/schedule/observer.h b/be/src/exec/pipeline/schedule/observer.h new file mode 100644 index 0000000000000..a448da2ada2df --- /dev/null +++ b/be/src/exec/pipeline/schedule/observer.h @@ -0,0 +1,154 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "exec/pipeline/pipeline_fwd.h" +#include "exec/pipeline/schedule/common.h" +#include "exec/pipeline/schedule/utils.h" +#include "runtime/runtime_state.h" +#include "util/defer_op.h" +#include "util/race_detect.h" + +namespace starrocks::pipeline { +class SourceOperator; + +// Pipeline observer objects. the Observable issues an event to the Observer. +// thread-safe. +// If the driver satisfies the ready state, +// the observer adds the corresponding driver to the ready driver queue. +// If the pipeline driver is not in the block queue, then the pipeline driver tries to tag it. +// We need to handle this in the event scheduler. +class PipelineObserver { +public: + PipelineObserver(DriverRawPtr driver) : _driver(driver) {} + + DISALLOW_COPY_AND_MOVE(PipelineObserver); + + void source_update() { + _active_event(SOURCE_CHANGE_EVENT); + _update([this](int event) { _do_update(event); }); + } + + void sink_update() { + _active_event(SINK_CHANGE_EVENT); + _update([this](int event) { _do_update(event); }); + } + + void cancel_update() { + _active_event(CANCEL_EVENT); + _update([this](int event) { _do_update(event); }); + } + + void all_update() { + _active_event(SOURCE_CHANGE_EVENT | SINK_CHANGE_EVENT); + _update([this](int event) { _do_update(event); }); + } + + DriverRawPtr driver() const { return _driver; } + +private: + template + void _update(DoUpdate&& callback) { + int event = 0; + AtomicRequestControler(_pending_event_cnt, [&]() { + RACE_DETECT(detect_do_update); + event |= _fetch_event(); + callback(event); + }); + } + +private: + static constexpr inline int32_t CANCEL_EVENT = 1 << 2; + static constexpr inline int32_t SINK_CHANGE_EVENT = 1 << 1; + static constexpr inline int32_t SOURCE_CHANGE_EVENT = 1; + + void _do_update(int event); + // fetch event + int _fetch_event() { return _events.fetch_and(0, std::memory_order_acq_rel); } + + bool _is_sink_changed(int event) { return event & SINK_CHANGE_EVENT; } + bool _is_source_changed(int event) { return event & SOURCE_CHANGE_EVENT; } + bool _is_cancel_changed(int event) { return event & CANCEL_EVENT; } + bool _is_all_changed(int event) { return _is_source_changed(event) && _is_sink_changed(event); } + + void _active_event(int event) { _events.fetch_or(event, std::memory_order_acq_rel); } + +private: + DECLARE_RACE_DETECTOR(detect_do_update) + DriverRawPtr _driver = nullptr; + std::atomic_int32_t _pending_event_cnt{}; + std::atomic_int32_t _events{}; +}; + +class Observable; +class Observable { +public: + Observable() = default; + Observable(const Observable&) = delete; + Observable& operator=(const Observable&) = delete; + + // Non-thread-safe, we only allow the need to do this in the fragment->prepare phase. + void add_observer(RuntimeState* state, PipelineObserver* observer) { + if (state->enable_event_scheduler()) { + _observers.push_back(observer); + } + } + + void notify_source_observers() { + for (auto* observer : _observers) { + observer->source_update(); + } + } + void notify_sink_observers() { + for (auto* observer : _observers) { + observer->sink_update(); + } + } + + size_t num_observers() const { return _observers.size(); } + + std::string to_string() const; + +private: + std::vector _observers; +}; + +// Lots of simple operators use a sink -> source one-to-one pipeline . +// We use this to simplify the development of this class of operators. +class PipeObservable { +public: + void attach_sink_observer(RuntimeState* state, pipeline::PipelineObserver* observer) { + _sink_observable.add_observer(state, observer); + } + void attach_source_observer(RuntimeState* state, pipeline::PipelineObserver* observer) { + _source_observable.add_observer(state, observer); + } + + auto defer_notify_source() { + return DeferOp([this]() { _source_observable.notify_source_observers(); }); + } + auto defer_notify_sink() { + return DeferOp([this]() { _sink_observable.notify_source_observers(); }); + } + +private: + Observable _sink_observable; + Observable _source_observable; +}; + +} // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/schedule/pipeline_timer.cpp b/be/src/exec/pipeline/schedule/pipeline_timer.cpp new file mode 100644 index 0000000000000..d71636e992522 --- /dev/null +++ b/be/src/exec/pipeline/schedule/pipeline_timer.cpp @@ -0,0 +1,74 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/pipeline/schedule/pipeline_timer.h" + +#include +#include +#include + +#include "bthread/timer_thread.h" +#include "common/status.h" +#include "fmt/format.h" + +namespace starrocks::pipeline { + +void PipelineTimerTask::waitUtilFinished() { + if (_finished.load(std::memory_order_acquire)) { + return; + } + _has_consumer.store(true, std::memory_order_release); + std::unique_lock lock(_mutex); + while (!_finished) { + _cv.wait(lock); + } +} + +void PipelineTimerTask::unschedule(PipelineTimer* timer) { + int rc = timer->unschedule(this); + if (rc == 1) { + waitUtilFinished(); + } +} + +Status PipelineTimer::start() { + _thr = std::make_shared(); + bthread::TimerThreadOptions options; + options.bvar_prefix = "pipeline_timer"; + int rc = _thr->start(&options); + if (rc != 0) { + return Status::InternalError(fmt::format("init pipeline timer error:{}", berror(errno))); + } + return Status::OK(); +} + +static void RunTimerTask(void* arg) { + auto* task = static_cast(arg); + task->doRun(); +} + +Status PipelineTimer::schedule(PipelineTimerTask* task, const timespec& abstime) { + TaskId tid = _thr->schedule(RunTimerTask, task, abstime); + if (tid == 0) { + return Status::InternalError(fmt::format("pipeline timer schedule task error:{}", berror(errno))); + } + task->set_tid(tid); + return Status::OK(); +} + +int PipelineTimer::unschedule(PipelineTimerTask* task) { + return _thr->unschedule(task->tid()); +} + +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/schedule/pipeline_timer.h b/be/src/exec/pipeline/schedule/pipeline_timer.h new file mode 100644 index 0000000000000..0de51d8ac2da7 --- /dev/null +++ b/be/src/exec/pipeline/schedule/pipeline_timer.h @@ -0,0 +1,79 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/status.h" + +namespace bthread { +class TimerThread; +} + +namespace starrocks::pipeline { +using TaskId = int64_t; +class PipelineTimer; + +class PipelineTimerTask { +public: + virtual ~PipelineTimerTask() = default; + + void doRun() { + Run(); + _finished.store(true, std::memory_order_seq_cst); + if (_has_consumer.load(std::memory_order_acquire)) { + _cv.notify_one(); + } + } + + // only call when unschedule == 1 + void waitUtilFinished(); + void unschedule(PipelineTimer* timer); + + void set_tid(TaskId tid) { _tid = tid; } + TaskId tid() const { return _tid; } + +protected: + // implement interface + virtual void Run() = 0; + +protected: + std::atomic _finished{}; + std::atomic _has_consumer{}; + TaskId _tid{}; + std::mutex _mutex; + std::condition_variable _cv; +}; + +class PipelineTimer { +public: + PipelineTimer() = default; + ~PipelineTimer() noexcept = default; + + Status start(); + + Status schedule(PipelineTimerTask* task, const timespec& abstime); + // 0 - Removed the task which does not run yet + // -1 - The task does not exist. + // 1 - The task is just running. + int unschedule(PipelineTimerTask* task); + +private: + std::shared_ptr _thr; +}; +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/schedule/timeout_tasks.cpp b/be/src/exec/pipeline/schedule/timeout_tasks.cpp new file mode 100644 index 0000000000000..631631500642f --- /dev/null +++ b/be/src/exec/pipeline/schedule/timeout_tasks.cpp @@ -0,0 +1,42 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/pipeline/schedule/timeout_tasks.h" + +#include "exec/pipeline/fragment_context.h" +#include "exec/pipeline/pipeline_fwd.h" +#include "exec/pipeline/schedule/common.h" +#include "util/stack_util.h" + +namespace starrocks::pipeline { +void CheckFragmentTimeout::Run() { + auto query_ctx = _fragment_ctx->runtime_state()->query_ctx(); + size_t expire_seconds = query_ctx->get_query_expire_seconds(); + TRACE_SCHEDULE_LOG << "fragment_instance_id:" << print_id(_fragment_ctx->fragment_instance_id()); + _fragment_ctx->cancel(Status::TimedOut(fmt::format("Query exceeded time limit of {} seconds", expire_seconds))); + + _fragment_ctx->iterate_drivers([](const DriverPtr& driver) { + driver->set_need_check_reschedule(true); + if (driver->is_in_block_queue()) { + TRACE_SCHEDULE_LOG << "[Driver] Timeout " << driver->to_readable_string(); + driver->observer()->cancel_update(); + } + }); +} + +void RFScanWaitTimeout::Run() { + _timeout.notify_source_observers(); +} + +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/schedule/timeout_tasks.h b/be/src/exec/pipeline/schedule/timeout_tasks.h new file mode 100644 index 0000000000000..b6b5ef2ad5369 --- /dev/null +++ b/be/src/exec/pipeline/schedule/timeout_tasks.h @@ -0,0 +1,46 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "exec/pipeline/schedule/observer.h" +#include "exec/pipeline/schedule/pipeline_timer.h" +#include "runtime/descriptors.h" + +namespace starrocks::pipeline { +class FragmentContext; + +// TimerTask object, fragment->cancel is called if the timeout is reached. +class CheckFragmentTimeout final : public PipelineTimerTask { +public: + CheckFragmentTimeout(FragmentContext* fragment_ctx) : _fragment_ctx(fragment_ctx) {} + void Run() override; + +private: + FragmentContext* _fragment_ctx; +}; + +// If the timeout is reached, a cancel_update event is sent to all objects observing _timeout. +class RFScanWaitTimeout final : public PipelineTimerTask { +public: + RFScanWaitTimeout(FragmentContext* fragment_ctx) : _fragment_ctx(fragment_ctx) {} + void add_observer(RuntimeState* state, PipelineObserver* observer) { _timeout.add_observer(state, observer); } + void Run() override; + +private: + FragmentContext* _fragment_ctx; + Observable _timeout; +}; + +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/schedule/utils.h b/be/src/exec/pipeline/schedule/utils.h new file mode 100644 index 0000000000000..288fdd0a50483 --- /dev/null +++ b/be/src/exec/pipeline/schedule/utils.h @@ -0,0 +1,42 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include + +namespace starrocks { +// wait-free. +// Multi-threaded calls ensure that each pending event is handled. +// Callbacks internally handle multiple events at once. +class AtomicRequestControler { +public: + template + explicit AtomicRequestControler(std::atomic_int32_t& request, CallBack&& callback) : _request(request) { + if (_request.fetch_add(1, std::memory_order_acq_rel) == 0) { + int progress = _request.load(std::memory_order_acquire); + do { + callback(); + } while (_has_more(&progress)); + } + } + +private: + bool _has_more(int* progress) { + return !_request.compare_exchange_strong(*progress, 0, std::memory_order_release, std::memory_order_acquire); + } + +private: + std::atomic_int32_t& _request; +}; +} // namespace starrocks \ No newline at end of file diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index db64995ac3ce4..1025f60cfd29f 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -505,6 +505,9 @@ class RuntimeState { return this->_broadcast_join_right_offsprings; } + bool enable_event_scheduler() const { return _enable_event_scheduler; } + void set_enable_event_scheduler(bool enable) { _enable_event_scheduler = enable; } + private: // Set per-query state. void _init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, @@ -646,6 +649,8 @@ class RuntimeState { BroadcastJoinRightOffsprings _broadcast_join_right_offsprings; std::optional _spill_options; + + bool _enable_event_scheduler = false; }; #define LIMIT_EXCEEDED(tracker, state, msg) \ diff --git a/be/src/util/defer_op.h b/be/src/util/defer_op.h index a0bd03d756313..20f37ce13a869 100644 --- a/be/src/util/defer_op.h +++ b/be/src/util/defer_op.h @@ -34,9 +34,10 @@ #pragma once -#include #include +#include "gutil/macros.h" + namespace starrocks { // This class is used to defer a function when this object is deconstruct @@ -47,6 +48,8 @@ class DeferOp { ~DeferOp() noexcept { (void)_func(); } + DISALLOW_COPY_AND_MOVE(DeferOp); + private: DeferFunction _func; }; @@ -61,6 +64,7 @@ class CancelableDefer { } } void cancel() { _cancel = true; } + DISALLOW_COPY_AND_MOVE(CancelableDefer); private: bool _cancel{}; diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 09d81d78d311a..8195c681389b5 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -47,6 +47,7 @@ set(EXEC_FILES ./exec/paimon/paimon_delete_file_builder_test.cpp ./exec/workgroup/scan_task_queue_test.cpp ./exec/workgroup/pipeline_executor_set_test.cpp + ./exec/pipeline/schedule/observer_test.cpp ./exec/pipeline/pipeline_control_flow_test.cpp ./exec/pipeline/pipeline_driver_queue_test.cpp ./exec/pipeline/pipeline_file_scan_node_test.cpp diff --git a/be/test/exec/pipeline/schedule/observer_test.cpp b/be/test/exec/pipeline/schedule/observer_test.cpp new file mode 100644 index 0000000000000..ec738bc64d8d1 --- /dev/null +++ b/be/test/exec/pipeline/schedule/observer_test.cpp @@ -0,0 +1,161 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include + +#include "butil/time.h" +#include "common/object_pool.h" +#include "exec/pipeline/empty_set_operator.h" +#include "exec/pipeline/group_execution/execution_group.h" +#include "exec/pipeline/noop_sink_operator.h" +#include "exec/pipeline/pipeline_driver.h" +#include "exec/pipeline/pipeline_driver_queue.h" +#include "exec/pipeline/pipeline_fwd.h" +#include "exec/pipeline/schedule/pipeline_timer.h" +#include "exec/pipeline/schedule/utils.h" +#include "gtest/gtest.h" +#include "testutil/assert.h" +#include "util/runtime_profile.h" + +namespace starrocks::pipeline { +TEST(RequestCntlTest, test) { + { + std::atomic_int32_t val{}; + std::vector data; + for (size_t i = 0; i < 10; ++i) { + data.emplace_back(i); + } + int loop = 0; + AtomicRequestControler cntl(val, [&]() { + loop++; + while (!data.empty()) { + data.pop_back(); + } + }); + ASSERT_EQ(loop, 1); + } + { + // test run with concurrency + std::mutex mutex; + std::condition_variable cv; + std::atomic_int32_t val{}; + std::counting_semaphore<> s(0); + std::vector data; + data.emplace_back(0); + std::vector threads; + for (int i = 0; i < 10; ++i) { + threads.emplace_back([&]() { + { + std::unique_lock lock(mutex); + s.release(); + cv.wait(lock); + } + AtomicRequestControler cntl(val, [&]() { + while (!data.empty()) { + data.pop_back(); + } + }); + }); + } + for (size_t i = 0; i < 10; ++i) { + s.acquire(); + } + cv.notify_all(); + for (auto& thread : threads) { + thread.join(); + } + ASSERT_EQ(data.size(), 0); + } +} + +TEST(TimerThreadTest, test) { + PipelineTimer timer; + ASSERT_OK(timer.start()); + { + std::counting_semaphore<> s(0); + int changed = false; + struct Timer : public PipelineTimerTask { + Timer(int32_t& changed_, std::counting_semaphore<>& s_) : changed(changed_), s(s_) {} + void Run() override { + changed = true; + s.release(); + } + int32_t& changed; + std::counting_semaphore<>& s; + }; + Timer noop(changed, s); + // + timespec abstime = butil::microseconds_to_timespec(butil::gettimeofday_us()); + timespec s1 = abstime; + s1.tv_sec -= 10; + // schedule a expired task + ASSERT_OK(timer.schedule(&noop, s1)); + s.acquire(); + noop.unschedule(&timer); + ASSERT_TRUE(changed); + + timespec s2 = abstime; + s2.tv_sec += 3600; + // schedule a task + changed = false; + ASSERT_OK(timer.schedule(&noop, s2)); + sleep(1); + noop.unschedule(&timer); + ASSERT_FALSE(changed); + } +} + +TEST(ObservableTest, test) { + auto dummy_query_ctx = std::make_shared(); + auto dummy_fragment_ctx = std::make_shared(); + auto exec_group = std::make_shared(); + auto runtime_state = std::make_shared(); + runtime_state->_obj_pool = std::make_shared(); + runtime_state->set_query_ctx(dummy_query_ctx.get()); + runtime_state->set_fragment_ctx(dummy_fragment_ctx.get()); + runtime_state->_profile = std::make_shared("dummy"); + dummy_fragment_ctx->set_runtime_state(std::move(runtime_state)); + OpFactories factories; + factories.emplace_back(std::make_shared(0, 1)); + factories.emplace_back(std::make_shared(2, 3)); + + Pipeline pipeline(0, factories, exec_group.get()); + auto operators = pipeline.create_operators(1, 0); + PipelineDriver driver(operators, dummy_query_ctx.get(), dummy_fragment_ctx.get(), &pipeline, 1); + driver.assign_observer(); + ASSERT_OK(driver.prepare(dummy_fragment_ctx->runtime_state())); + + auto driver_queue = std::make_unique(); + dummy_fragment_ctx->init_event_scheduler(); + dummy_fragment_ctx->event_scheduler()->attach_queue(driver_queue.get()); + + driver.set_in_block_queue(true); + driver.set_driver_state(DriverState::INPUT_EMPTY); + // test notify + driver.observer()->all_update(); + ASSERT_OK(driver_queue->take(false)); + driver.observer()->cancel_update(); + ASSERT_OK(driver_queue->take(false)); + driver.observer()->sink_update(); + ASSERT_OK(driver_queue->take(false)); + driver.observer()->source_update(); + driver.observer()->source_update(); +} +} // namespace starrocks::pipeline \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index ff08fc831ba73..b3ed7ae24a20b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -577,6 +577,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable { public static final String ENABLE_EXECUTION_ONLY = "enable_execution_only"; + public static final String ENABLE_PIPELINE_EVENT_SCHEDULER = "enable_pipeline_event_scheduler"; + // Flag to control whether to proxy follower's query statement to leader/follower. public enum FollowerQueryForwardMode { DEFAULT, // proxy queries by the follower's replay progress (default) @@ -2387,6 +2389,9 @@ public boolean isCboPredicateSubfieldPath() { @VarAttr(name = ENABLE_PHASED_SCHEDULER) private boolean enablePhasedScheduler = false; + @VarAttr(name = ENABLE_PIPELINE_EVENT_SCHEDULER) + private boolean enablePipelineEventScheduler = false; + @VarAttr(name = PHASED_SCHEDULER_MAX_CONCURRENCY) private int phasedSchedulerMaxConcurrency = 2; @@ -4605,6 +4610,7 @@ public TQueryOptions toThrift() { tResult.setEnable_wait_dependent_event(enableWaitDependentEvent); tResult.setConnector_max_split_size(connectorMaxSplitSize); tResult.setOrc_use_column_names(orcUseColumnNames); + tResult.setEnable_pipeline_event_scheduler(enablePipelineEventScheduler); return tResult; } diff --git a/gensrc/thrift/InternalService.thrift b/gensrc/thrift/InternalService.thrift index 407741dc6bc8a..4a3413bb5a61c 100644 --- a/gensrc/thrift/InternalService.thrift +++ b/gensrc/thrift/InternalService.thrift @@ -321,6 +321,8 @@ struct TQueryOptions { 141: optional i32 datacache_evict_probability; + 142: optional bool enable_pipeline_event_scheduler; + 150: optional map ann_params; 151: optional double pq_refine_factor; 152: optional double k_factor; From cc8f32d5443447e79130f35e9f99b8003c7a78f4 Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 15:22:37 +0800 Subject: [PATCH 02/10] save pipeline driver Signed-off-by: stdpain --- be/src/exec/pipeline/pipeline_driver.cpp | 8 +++ be/src/exec/pipeline/pipeline_driver.h | 62 ++++++++++++++++++++++-- 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_driver.cpp b/be/src/exec/pipeline/pipeline_driver.cpp index d895eda16d9ec..026eb52cadf5a 100644 --- a/be/src/exec/pipeline/pipeline_driver.cpp +++ b/be/src/exec/pipeline/pipeline_driver.cpp @@ -25,6 +25,7 @@ #include "exec/pipeline/pipeline_driver_executor.h" #include "exec/pipeline/scan/olap_scan_operator.h" #include "exec/pipeline/scan/scan_operator.h" +#include "exec/pipeline/schedule/timeout_tasks.h" #include "exec/pipeline/source_operator.h" #include "exec/query_cache/cache_operator.h" #include "exec/query_cache/lane_arbiter.h" @@ -170,6 +171,7 @@ Status PipelineDriver::prepare(RuntimeState* runtime_state) { size_t subscribe_filter_sequence = source_op->get_driver_sequence(); _local_rf_holders = fragment_ctx()->runtime_filter_hub()->gather_holders(all_local_rf_set, subscribe_filter_sequence); + if (use_cache) { ssize_t cache_op_idx = -1; query_cache::CacheOperatorPtr cache_op = nullptr; @@ -904,4 +906,10 @@ void PipelineDriver::increment_schedule_times() { driver_acct().increment_schedule_times(); } +void PipelineDriver::assign_observer() { + for (const auto& op : _operators) { + op->set_observer(&_observer); + } +} + } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/pipeline_driver.h b/be/src/exec/pipeline/pipeline_driver.h index 94870fe87e293..2c352db9159ef 100644 --- a/be/src/exec/pipeline/pipeline_driver.h +++ b/be/src/exec/pipeline/pipeline_driver.h @@ -29,6 +29,8 @@ #include "exec/pipeline/runtime_filter_types.h" #include "exec/pipeline/scan/morsel.h" #include "exec/pipeline/scan/scan_operator.h" +#include "exec/pipeline/schedule/common.h" +#include "exec/pipeline/schedule/observer.h" #include "exec/pipeline/source_operator.h" #include "exec/workgroup/work_group_fwd.h" #include "exprs/runtime_filter_bank.h" @@ -195,6 +197,26 @@ enum OperatorStage { class PipelineDriver { friend class PipelineDriverPoller; +public: + class ScheduleToken { + public: + ScheduleToken(DriverRawPtr driver, bool acquired) : _driver(driver), _acquired(acquired) {} + ~ScheduleToken() { + if (_acquired) { + _driver->_schedule_token = true; + } + } + + ScheduleToken(const ScheduleToken&) = delete; + void operator=(const ScheduleToken&) = delete; + + bool acquired() const { return _acquired; } + + private: + DriverRawPtr _driver; + bool _acquired; + }; + public: PipelineDriver(const Operators& operators, QueryContext* query_ctx, FragmentContext* fragment_ctx, Pipeline* pipeline, int32_t driver_id) @@ -203,11 +225,13 @@ class PipelineDriver { _fragment_ctx(fragment_ctx), _pipeline(pipeline), _source_node_id(operators[0]->get_plan_node_id()), - _driver_id(driver_id) { + _driver_id(driver_id), + _observer(this) { _runtime_profile = std::make_shared(strings::Substitute("PipelineDriver (id=$0)", _driver_id)); for (auto& op : _operators) { _operator_stages[op->get_id()] = OperatorStage::INIT; } + _driver_name = fmt::sprintf("driver_%d_%d", _source_node_id, _driver_id); } @@ -336,7 +360,6 @@ class PipelineDriver { if (_all_global_rf_ready_or_timeout) { return false; } - _all_global_rf_ready_or_timeout = _precondition_block_timer_sw->elapsed_time() >= _global_rf_wait_timeout_ns || // Timeout, std::all_of(_global_rf_descriptors.begin(), _global_rf_descriptors.end(), [](auto* rf_desc) { @@ -440,7 +463,27 @@ class PipelineDriver { void set_driver_queue_level(size_t driver_queue_level) { _driver_queue_level = driver_queue_level; } inline bool is_in_ready_queue() const { return _in_ready_queue.load(std::memory_order_acquire); } - void set_in_ready_queue(bool v) { _in_ready_queue.store(v, std::memory_order_release); } + void set_in_ready_queue(bool v) { + SCHEDULE_CHECK(!v || !is_in_ready_queue()); + _in_ready_queue.store(v, std::memory_order_release); + } + + bool is_in_block_queue() const { return _in_block_queue.load(std::memory_order_acquire); } + void set_in_block_queue(bool v) { + SCHEDULE_CHECK(!v || !is_in_block_queue()); + SCHEDULE_CHECK(!is_in_ready_queue()); + _in_block_queue.store(v, std::memory_order_release); + } + + ScheduleToken acquire_schedule_token() { + bool val = false; + return {this, _schedule_token.compare_exchange_strong(val, true)}; + } + + DECLARE_RACE_DETECTOR(schedule) + + bool need_check_reschedule() const { return _need_check_reschedule; } + void set_need_check_reschedule(bool need_reschedule) { _need_check_reschedule = need_reschedule; } inline std::string get_name() const { return strings::Substitute("PipelineDriver (id=$0)", _driver_id); } @@ -456,6 +499,9 @@ class PipelineDriver { return source_operator()->is_epoch_finishing() || sink_operator()->is_epoch_finishing(); } + PipelineObserver* observer() { return &_observer; } + void assign_observer(); + protected: PipelineDriver() : _operators(), @@ -463,7 +509,8 @@ class PipelineDriver { _fragment_ctx(nullptr), _pipeline(nullptr), _source_node_id(0), - _driver_id(0) {} + _driver_id(0), + _observer(this) {} // Yield PipelineDriver when maximum time in nano-seconds has spent in current execution round. static constexpr int64_t YIELD_MAX_TIME_SPENT_NS = 100'000'000L; @@ -526,9 +573,16 @@ class PipelineDriver { // The index of QuerySharedDriverQueue._queues which this driver belongs to. size_t _driver_queue_level = 0; std::atomic _in_ready_queue{false}; + // Indicates whether it is in a block queue. Only used in EventScheduler mode. + std::atomic _in_block_queue{false}; + + std::atomic _schedule_token{true}; + // Indicates if the block queue needs to be checked when it is added to the block queue. See EventScheduler for details. + std::atomic _need_check_reschedule{false}; std::atomic _has_log_cancelled{false}; + PipelineObserver _observer; // metrics RuntimeProfile::Counter* _total_timer = nullptr; RuntimeProfile::Counter* _active_timer = nullptr; From 142ea2a6d1d8f01afa2ad5d6d80e124ff8f46592 Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 15:50:49 +0800 Subject: [PATCH 03/10] add case Signed-off-by: stdpain --- be/src/exec/CMakeLists.txt | 1 - be/src/exec/pipeline/pipeline_driver.cpp | 1 - .../exec/pipeline/schedule/timeout_tasks.cpp | 42 ----------------- be/src/exec/pipeline/schedule/timeout_tasks.h | 46 ------------------- .../exec/pipeline/schedule/observer_test.cpp | 26 +++++++++++ 5 files changed, 26 insertions(+), 90 deletions(-) delete mode 100644 be/src/exec/pipeline/schedule/timeout_tasks.cpp delete mode 100644 be/src/exec/pipeline/schedule/timeout_tasks.h diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index d9ef2cb9cfd5f..b93c5cd60d2d9 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -289,7 +289,6 @@ set(EXEC_FILES pipeline/schedule/event_scheduler.cpp pipeline/schedule/observer.cpp pipeline/schedule/pipeline_timer.cpp - pipeline/schedule/timeout_tasks.cpp workgroup/pipeline_executor_set.cpp workgroup/pipeline_executor_set_manager.cpp workgroup/work_group.cpp diff --git a/be/src/exec/pipeline/pipeline_driver.cpp b/be/src/exec/pipeline/pipeline_driver.cpp index 026eb52cadf5a..41474a6274b9f 100644 --- a/be/src/exec/pipeline/pipeline_driver.cpp +++ b/be/src/exec/pipeline/pipeline_driver.cpp @@ -25,7 +25,6 @@ #include "exec/pipeline/pipeline_driver_executor.h" #include "exec/pipeline/scan/olap_scan_operator.h" #include "exec/pipeline/scan/scan_operator.h" -#include "exec/pipeline/schedule/timeout_tasks.h" #include "exec/pipeline/source_operator.h" #include "exec/query_cache/cache_operator.h" #include "exec/query_cache/lane_arbiter.h" diff --git a/be/src/exec/pipeline/schedule/timeout_tasks.cpp b/be/src/exec/pipeline/schedule/timeout_tasks.cpp deleted file mode 100644 index 631631500642f..0000000000000 --- a/be/src/exec/pipeline/schedule/timeout_tasks.cpp +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "exec/pipeline/schedule/timeout_tasks.h" - -#include "exec/pipeline/fragment_context.h" -#include "exec/pipeline/pipeline_fwd.h" -#include "exec/pipeline/schedule/common.h" -#include "util/stack_util.h" - -namespace starrocks::pipeline { -void CheckFragmentTimeout::Run() { - auto query_ctx = _fragment_ctx->runtime_state()->query_ctx(); - size_t expire_seconds = query_ctx->get_query_expire_seconds(); - TRACE_SCHEDULE_LOG << "fragment_instance_id:" << print_id(_fragment_ctx->fragment_instance_id()); - _fragment_ctx->cancel(Status::TimedOut(fmt::format("Query exceeded time limit of {} seconds", expire_seconds))); - - _fragment_ctx->iterate_drivers([](const DriverPtr& driver) { - driver->set_need_check_reschedule(true); - if (driver->is_in_block_queue()) { - TRACE_SCHEDULE_LOG << "[Driver] Timeout " << driver->to_readable_string(); - driver->observer()->cancel_update(); - } - }); -} - -void RFScanWaitTimeout::Run() { - _timeout.notify_source_observers(); -} - -} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/src/exec/pipeline/schedule/timeout_tasks.h b/be/src/exec/pipeline/schedule/timeout_tasks.h deleted file mode 100644 index b6b5ef2ad5369..0000000000000 --- a/be/src/exec/pipeline/schedule/timeout_tasks.h +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include "exec/pipeline/schedule/observer.h" -#include "exec/pipeline/schedule/pipeline_timer.h" -#include "runtime/descriptors.h" - -namespace starrocks::pipeline { -class FragmentContext; - -// TimerTask object, fragment->cancel is called if the timeout is reached. -class CheckFragmentTimeout final : public PipelineTimerTask { -public: - CheckFragmentTimeout(FragmentContext* fragment_ctx) : _fragment_ctx(fragment_ctx) {} - void Run() override; - -private: - FragmentContext* _fragment_ctx; -}; - -// If the timeout is reached, a cancel_update event is sent to all objects observing _timeout. -class RFScanWaitTimeout final : public PipelineTimerTask { -public: - RFScanWaitTimeout(FragmentContext* fragment_ctx) : _fragment_ctx(fragment_ctx) {} - void add_observer(RuntimeState* state, PipelineObserver* observer) { _timeout.add_observer(state, observer); } - void Run() override; - -private: - FragmentContext* _fragment_ctx; - Observable _timeout; -}; - -} // namespace starrocks::pipeline \ No newline at end of file diff --git a/be/test/exec/pipeline/schedule/observer_test.cpp b/be/test/exec/pipeline/schedule/observer_test.cpp index ec738bc64d8d1..870f2aeceff0c 100644 --- a/be/test/exec/pipeline/schedule/observer_test.cpp +++ b/be/test/exec/pipeline/schedule/observer_test.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include @@ -120,6 +121,31 @@ TEST(TimerThreadTest, test) { noop.unschedule(&timer); ASSERT_FALSE(changed); } + { + std::counting_semaphore<> s(0); + int changed = false; + struct SleepTimer : public PipelineTimerTask { + SleepTimer(int32_t& changed_, std::counting_semaphore<>& s_) : changed(changed_), s(s_) {} + void Run() override { + s.release(); + (void)sleep(5); + changed = true; + } + int32_t& changed; + std::counting_semaphore<>& s; + }; + SleepTimer noop(changed, s); + // + timespec abstime = butil::microseconds_to_timespec(butil::gettimeofday_us()); + timespec s1 = abstime; + s1.tv_sec -= 10; + // schedule a expired task + ASSERT_OK(timer.schedule(&noop, s1)); + s.acquire(); + // will wait util timer finished + noop.unschedule(&timer); + ASSERT_TRUE(changed); + } } TEST(ObservableTest, test) { From 59315fa42c08a7c5448e573958e7ee1657b19178 Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 17:20:27 +0800 Subject: [PATCH 04/10] fix unstable UT Signed-off-by: stdpain --- be/test/exec/workgroup/scan_task_queue_test.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/test/exec/workgroup/scan_task_queue_test.cpp b/be/test/exec/workgroup/scan_task_queue_test.cpp index 223df792d105b..470e53dcd6fdb 100644 --- a/be/test/exec/workgroup/scan_task_queue_test.cpp +++ b/be/test/exec/workgroup/scan_task_queue_test.cpp @@ -83,6 +83,7 @@ PARALLEL_TEST(ScanExecutorTest, test_yield) { ctx.total_yield_point_cnt = 2; DCHECK_LT(ctx.yield_point, ctx.total_yield_point_cnt); if (ctx.yield_point == 1) { + std::lock_guard guard(mutex); finished_tasks++; cv.notify_one(); } From 7791a7d7a1ab44fb755050092156ed0cbffa9deb Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 19:30:21 +0800 Subject: [PATCH 05/10] tmp save Signed-off-by: stdpain --- .../exec/pipeline/schedule/observer_test.cpp | 109 +++++++++++++++--- 1 file changed, 94 insertions(+), 15 deletions(-) diff --git a/be/test/exec/pipeline/schedule/observer_test.cpp b/be/test/exec/pipeline/schedule/observer_test.cpp index 870f2aeceff0c..d7fd9759c2896 100644 --- a/be/test/exec/pipeline/schedule/observer_test.cpp +++ b/be/test/exec/pipeline/schedule/observer_test.cpp @@ -148,29 +148,58 @@ TEST(TimerThreadTest, test) { } } -TEST(ObservableTest, test) { - auto dummy_query_ctx = std::make_shared(); - auto dummy_fragment_ctx = std::make_shared(); - auto exec_group = std::make_shared(); - auto runtime_state = std::make_shared(); - runtime_state->_obj_pool = std::make_shared(); - runtime_state->set_query_ctx(dummy_query_ctx.get()); - runtime_state->set_fragment_ctx(dummy_fragment_ctx.get()); - runtime_state->_profile = std::make_shared("dummy"); - dummy_fragment_ctx->set_runtime_state(std::move(runtime_state)); +class PipelineObserverTest : public ::testing::Test { +public: + void SetUp() override { + _dummy_query_ctx = std::make_shared(); + _dummy_fragment_ctx = std::make_shared(); + _exec_group = std::make_shared(); + _runtime_state = std::make_shared(); + _runtime_state->_obj_pool = std::make_shared(); + _runtime_state->set_query_ctx(_dummy_query_ctx.get()); + _runtime_state->set_fragment_ctx(_dummy_fragment_ctx.get()); + _runtime_state->_profile = std::make_shared("dummy"); + _dummy_fragment_ctx->set_runtime_state(std::move(_runtime_state)); + _runtime_state = _dummy_fragment_ctx->runtime_state_ptr(); + } + + std::shared_ptr _dummy_query_ctx; + std::shared_ptr _dummy_fragment_ctx; + std::shared_ptr _exec_group; + std::shared_ptr _runtime_state; +}; + +struct SimpleTestContext { + SimpleTestContext(OpFactories factories, ExecutionGroup* exec_group, FragmentContext* fragment_ctx, + QueryContext* query_ctx) + : pipeline(0, std::move(factories), exec_group) { + auto operators = pipeline.create_operators(1, 0); + driver = std::make_unique(operators, query_ctx, fragment_ctx, &pipeline, 1); + driver->assign_observer(); + driver_queue = std::make_unique(); + fragment_ctx->init_event_scheduler(); + fragment_ctx->event_scheduler()->attach_queue(driver_queue.get()); + } + + Pipeline pipeline; + std::unique_ptr driver_queue; + std::unique_ptr driver; +}; + +TEST_F(PipelineObserverTest, basic_test) { OpFactories factories; factories.emplace_back(std::make_shared(0, 1)); factories.emplace_back(std::make_shared(2, 3)); - Pipeline pipeline(0, factories, exec_group.get()); + Pipeline pipeline(0, factories, _exec_group.get()); auto operators = pipeline.create_operators(1, 0); - PipelineDriver driver(operators, dummy_query_ctx.get(), dummy_fragment_ctx.get(), &pipeline, 1); + PipelineDriver driver(operators, _dummy_query_ctx.get(), _dummy_fragment_ctx.get(), &pipeline, 1); driver.assign_observer(); - ASSERT_OK(driver.prepare(dummy_fragment_ctx->runtime_state())); + ASSERT_OK(driver.prepare(_dummy_fragment_ctx->runtime_state())); auto driver_queue = std::make_unique(); - dummy_fragment_ctx->init_event_scheduler(); - dummy_fragment_ctx->event_scheduler()->attach_queue(driver_queue.get()); + _dummy_fragment_ctx->init_event_scheduler(); + _dummy_fragment_ctx->event_scheduler()->attach_queue(driver_queue.get()); driver.set_in_block_queue(true); driver.set_driver_state(DriverState::INPUT_EMPTY); @@ -184,4 +213,54 @@ TEST(ObservableTest, test) { driver.observer()->source_update(); driver.observer()->source_update(); } + +TEST_F(PipelineObserverTest, basic_test2) { + OpFactories factories; + factories.emplace_back(std::make_shared(0, 1)); + factories.emplace_back(std::make_shared(2, 3)); + SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get()); + ASSERT_OK(tx.driver->prepare(_runtime_state.get())); + const auto& driver = tx.driver; + const auto& driver_queue = tx.driver_queue; + + driver->set_in_block_queue(true); + driver->set_driver_state(DriverState::INPUT_EMPTY); + // test notify + driver->observer()->all_update(); + ASSERT_OK(driver_queue->take(false)); + driver->observer()->cancel_update(); + ASSERT_OK(driver_queue->take(false)); + driver->observer()->sink_update(); + ASSERT_OK(driver_queue->take(false)); + driver->observer()->source_update(); + driver->observer()->source_update(); +} + +TEST_F(PipelineObserverTest, race_scheduler_with_observer) { + OpFactories factories; + factories.emplace_back(std::make_shared(0, 1)); + factories.emplace_back(std::make_shared(2, 3)); + SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get()); + ASSERT_OK(tx.driver->prepare(_runtime_state.get())); + const auto& driver = tx.driver; + const auto& driver_queue = tx.driver_queue; + + driver->set_in_block_queue(false); + driver->set_driver_state(DriverState::INPUT_EMPTY); + driver->observer()->sink_update(); + _dummy_fragment_ctx->event_scheduler()->add_blocked_driver(driver.get()); + + driver->set_in_block_queue(true); + driver->set_driver_state(DriverState::INPUT_EMPTY); + // test notify + driver->observer()->all_update(); + ASSERT_OK(driver_queue->take(false)); + driver->observer()->cancel_update(); + ASSERT_OK(driver_queue->take(false)); + driver->observer()->sink_update(); + ASSERT_OK(driver_queue->take(false)); + driver->observer()->source_update(); + driver->observer()->source_update(); +} + } // namespace starrocks::pipeline \ No newline at end of file From 8d6abcf265185afd93049f4eb94b6d78f2ec20bb Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 20:15:36 +0800 Subject: [PATCH 06/10] add more case Signed-off-by: stdpain --- be/src/exec/pipeline/empty_set_operator.h | 2 + be/src/exec/pipeline/noop_sink_operator.h | 2 + .../exec/pipeline/schedule/observer_test.cpp | 60 ++++++------------- 3 files changed, 21 insertions(+), 43 deletions(-) diff --git a/be/src/exec/pipeline/empty_set_operator.h b/be/src/exec/pipeline/empty_set_operator.h index bef6fdadcf01e..c002e5e99ddca 100644 --- a/be/src/exec/pipeline/empty_set_operator.h +++ b/be/src/exec/pipeline/empty_set_operator.h @@ -41,6 +41,8 @@ class EmptySetOperatorFactory final : public SourceOperatorFactory { ~EmptySetOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { return std::make_shared(this, _id, _plan_node_id, driver_sequence); } diff --git a/be/src/exec/pipeline/noop_sink_operator.h b/be/src/exec/pipeline/noop_sink_operator.h index 57d1cbb78a64a..1f11741204f00 100644 --- a/be/src/exec/pipeline/noop_sink_operator.h +++ b/be/src/exec/pipeline/noop_sink_operator.h @@ -55,6 +55,8 @@ class NoopSinkOperatorFactory final : public OperatorFactory { ~NoopSinkOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { return std::make_shared(this, _id, _plan_node_id, driver_sequence); } diff --git a/be/test/exec/pipeline/schedule/observer_test.cpp b/be/test/exec/pipeline/schedule/observer_test.cpp index d7fd9759c2896..366da1ac6567c 100644 --- a/be/test/exec/pipeline/schedule/observer_test.cpp +++ b/be/test/exec/pipeline/schedule/observer_test.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "exec/pipeline/schedule/observer.h" + #include #include @@ -35,6 +37,9 @@ #include "testutil/assert.h" #include "util/runtime_profile.h" +#pragma GCC push_options +#pragma GCC optimize("no-inline") + namespace starrocks::pipeline { TEST(RequestCntlTest, test) { { @@ -191,33 +196,10 @@ TEST_F(PipelineObserverTest, basic_test) { factories.emplace_back(std::make_shared(0, 1)); factories.emplace_back(std::make_shared(2, 3)); - Pipeline pipeline(0, factories, _exec_group.get()); - auto operators = pipeline.create_operators(1, 0); - PipelineDriver driver(operators, _dummy_query_ctx.get(), _dummy_fragment_ctx.get(), &pipeline, 1); - driver.assign_observer(); - ASSERT_OK(driver.prepare(_dummy_fragment_ctx->runtime_state())); - - auto driver_queue = std::make_unique(); - _dummy_fragment_ctx->init_event_scheduler(); - _dummy_fragment_ctx->event_scheduler()->attach_queue(driver_queue.get()); - - driver.set_in_block_queue(true); - driver.set_driver_state(DriverState::INPUT_EMPTY); - // test notify - driver.observer()->all_update(); - ASSERT_OK(driver_queue->take(false)); - driver.observer()->cancel_update(); - ASSERT_OK(driver_queue->take(false)); - driver.observer()->sink_update(); - ASSERT_OK(driver_queue->take(false)); - driver.observer()->source_update(); - driver.observer()->source_update(); -} + for (auto& factory : factories) { + ASSERT_TRUE(factory->support_event_scheduler()); + } -TEST_F(PipelineObserverTest, basic_test2) { - OpFactories factories; - factories.emplace_back(std::make_shared(0, 1)); - factories.emplace_back(std::make_shared(2, 3)); SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get()); ASSERT_OK(tx.driver->prepare(_runtime_state.get())); const auto& driver = tx.driver; @@ -236,31 +218,23 @@ TEST_F(PipelineObserverTest, basic_test2) { driver->observer()->source_update(); } -TEST_F(PipelineObserverTest, race_scheduler_with_observer) { +TEST_F(PipelineObserverTest, test_obs) { OpFactories factories; factories.emplace_back(std::make_shared(0, 1)); factories.emplace_back(std::make_shared(2, 3)); SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get()); ASSERT_OK(tx.driver->prepare(_runtime_state.get())); const auto& driver = tx.driver; - const auto& driver_queue = tx.driver_queue; - - driver->set_in_block_queue(false); - driver->set_driver_state(DriverState::INPUT_EMPTY); - driver->observer()->sink_update(); - _dummy_fragment_ctx->event_scheduler()->add_blocked_driver(driver.get()); driver->set_in_block_queue(true); driver->set_driver_state(DriverState::INPUT_EMPTY); - // test notify - driver->observer()->all_update(); - ASSERT_OK(driver_queue->take(false)); - driver->observer()->cancel_update(); - ASSERT_OK(driver_queue->take(false)); - driver->observer()->sink_update(); - ASSERT_OK(driver_queue->take(false)); - driver->observer()->source_update(); - driver->observer()->source_update(); + Observable obs; + _runtime_state->set_enable_event_scheduler(true); + obs.add_observer(_runtime_state.get(), driver->observer()); + ASSERT_GT(obs.to_string().size(), 0); + obs.notify_sink_observers(); + obs.notify_source_observers(); } -} // namespace starrocks::pipeline \ No newline at end of file +} // namespace starrocks::pipeline +#pragma GCC pop_options From 19f5211e96ce5da6de83e64bd73f25b3b5fd640f Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 20:46:25 +0800 Subject: [PATCH 07/10] save Signed-off-by: stdpain --- .../exec/pipeline/schedule/observer_test.cpp | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/be/test/exec/pipeline/schedule/observer_test.cpp b/be/test/exec/pipeline/schedule/observer_test.cpp index 366da1ac6567c..9128282801491 100644 --- a/be/test/exec/pipeline/schedule/observer_test.cpp +++ b/be/test/exec/pipeline/schedule/observer_test.cpp @@ -227,7 +227,7 @@ TEST_F(PipelineObserverTest, test_obs) { const auto& driver = tx.driver; driver->set_in_block_queue(true); - driver->set_driver_state(DriverState::INPUT_EMPTY); + driver->set_driver_state(DriverState::PENDING_FINISH); Observable obs; _runtime_state->set_enable_event_scheduler(true); obs.add_observer(_runtime_state.get(), driver->observer()); @@ -236,5 +236,23 @@ TEST_F(PipelineObserverTest, test_obs) { obs.notify_source_observers(); } +TEST_F(PipelineObserverTest, test_cancel) { + OpFactories factories; + factories.emplace_back(std::make_shared(0, 1)); + factories.emplace_back(std::make_shared(2, 3)); + + SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get()); + ASSERT_OK(tx.driver->prepare(_runtime_state.get())); + const auto& driver = tx.driver; + + driver->set_driver_state(DriverState::INPUT_EMPTY); + _dummy_fragment_ctx->cancel(Status::InternalError("error")); + driver->set_in_block_queue(true); + driver->observer()->all_update(); + for (size_t i = 0; i < driver->_operator_stages.size(); ++i) { + driver->_operator_stages[i] = OperatorStage::CLOSED; + } +} + } // namespace starrocks::pipeline #pragma GCC pop_options From 8312196079efa0cf1ddc6c47dd711489d5d1790f Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 23:54:22 +0800 Subject: [PATCH 08/10] fix ut Signed-off-by: stdpain --- be/test/exec/pipeline/schedule/observer_test.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/be/test/exec/pipeline/schedule/observer_test.cpp b/be/test/exec/pipeline/schedule/observer_test.cpp index 9128282801491..f5b38f40140aa 100644 --- a/be/test/exec/pipeline/schedule/observer_test.cpp +++ b/be/test/exec/pipeline/schedule/observer_test.cpp @@ -63,6 +63,8 @@ TEST(RequestCntlTest, test) { std::condition_variable cv; std::atomic_int32_t val{}; std::counting_semaphore<> s(0); + std::atomic_int32_t sync{}; + std::vector data; data.emplace_back(0); std::vector threads; @@ -71,7 +73,7 @@ TEST(RequestCntlTest, test) { { std::unique_lock lock(mutex); s.release(); - cv.wait(lock); + cv.wait(lock, [&]() { return sync != 0; }); } AtomicRequestControler cntl(val, [&]() { while (!data.empty()) { @@ -83,7 +85,12 @@ TEST(RequestCntlTest, test) { for (size_t i = 0; i < 10; ++i) { s.acquire(); } - cv.notify_all(); + { + std::lock_guard guard(mutex); + sync = 1; + cv.notify_all(); + } + for (auto& thread : threads) { thread.join(); } From 2fb8a1e17e6cdbeed2596e7fe0568ee9f92521fe Mon Sep 17 00:00:00 2001 From: stdpain Date: Wed, 25 Dec 2024 10:46:27 +0800 Subject: [PATCH 09/10] add case Signed-off-by: stdpain --- .../exec/pipeline/schedule/observer_test.cpp | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/be/test/exec/pipeline/schedule/observer_test.cpp b/be/test/exec/pipeline/schedule/observer_test.cpp index f5b38f40140aa..3414d85749abc 100644 --- a/be/test/exec/pipeline/schedule/observer_test.cpp +++ b/be/test/exec/pipeline/schedule/observer_test.cpp @@ -261,5 +261,41 @@ TEST_F(PipelineObserverTest, test_cancel) { } } +TEST_F(PipelineObserverTest, test_add_blocked_driver) { + OpFactories factories; + factories.emplace_back(std::make_shared(0, 1)); + factories.emplace_back(std::make_shared(2, 3)); + + SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get()); + ASSERT_OK(tx.driver->prepare(_runtime_state.get())); + const auto& driver = tx.driver; + + driver->set_driver_state(DriverState::INPUT_EMPTY); + _dummy_fragment_ctx->event_scheduler()->add_blocked_driver(driver.get()); +} + +TEST_F(PipelineObserverTest, race_scheduler_observer) { + OpFactories factories; + factories.emplace_back(std::make_shared(0, 1)); + factories.emplace_back(std::make_shared(2, 3)); + + SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get()); + ASSERT_OK(tx.driver->prepare(_runtime_state.get())); + const auto& driver = tx.driver; + + driver->set_driver_state(DriverState::INPUT_EMPTY); + Observable obs; + obs.add_observer(_runtime_state.get(), driver->observer()); + + std::vector threads; + + threads.emplace_back([&]() { _dummy_fragment_ctx->event_scheduler()->add_blocked_driver(driver.get()); }); + threads.emplace_back([&]() { obs.notify_source_observers(); }); + + for (auto& thread : threads) { + thread.join(); + } +} + } // namespace starrocks::pipeline #pragma GCC pop_options From 2428618dfecf93b5ed359503fc71730bcce084cb Mon Sep 17 00:00:00 2001 From: stdpain Date: Wed, 25 Dec 2024 12:40:02 +0800 Subject: [PATCH 10/10] fix comments Signed-off-by: stdpain --- be/src/exec/pipeline/pipeline_driver.h | 28 +++++++++++-------- .../pipeline/pipeline_driver_executor.cpp | 2 +- .../exec/pipeline/pipeline_driver_queue.cpp | 12 ++++---- .../pipeline/schedule/event_scheduler.cpp | 10 +++---- be/src/exec/pipeline/schedule/observer.cpp | 7 +++-- be/src/exec/pipeline/schedule/observer.h | 12 ++++---- .../exec/pipeline/schedule/observer_test.cpp | 18 ++++++------ 7 files changed, 47 insertions(+), 42 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_driver.h b/be/src/exec/pipeline/pipeline_driver.h index 2c352db9159ef..f1b60b49ba11f 100644 --- a/be/src/exec/pipeline/pipeline_driver.h +++ b/be/src/exec/pipeline/pipeline_driver.h @@ -198,6 +198,9 @@ class PipelineDriver { friend class PipelineDriverPoller; public: + // used in event scheduler + // If event_scheduler doesn't get the token, it proves that the observer has entered the critical zone, + // and we don't know the real state of the driver, so we need to try scheduling the driver one more time. class ScheduleToken { public: ScheduleToken(DriverRawPtr driver, bool acquired) : _driver(driver), _acquired(acquired) {} @@ -462,17 +465,17 @@ class PipelineDriver { size_t get_driver_queue_level() const { return _driver_queue_level; } void set_driver_queue_level(size_t driver_queue_level) { _driver_queue_level = driver_queue_level; } - inline bool is_in_ready_queue() const { return _in_ready_queue.load(std::memory_order_acquire); } - void set_in_ready_queue(bool v) { - SCHEDULE_CHECK(!v || !is_in_ready_queue()); - _in_ready_queue.store(v, std::memory_order_release); + inline bool is_in_ready() const { return _in_ready.load(std::memory_order_acquire); } + void set_in_ready(bool v) { + SCHEDULE_CHECK(!v || !is_in_ready()); + _in_ready.store(v, std::memory_order_release); } - bool is_in_block_queue() const { return _in_block_queue.load(std::memory_order_acquire); } - void set_in_block_queue(bool v) { - SCHEDULE_CHECK(!v || !is_in_block_queue()); - SCHEDULE_CHECK(!is_in_ready_queue()); - _in_block_queue.store(v, std::memory_order_release); + bool is_in_blocked() const { return _in_blocked.load(std::memory_order_acquire); } + void set_in_blocked(bool v) { + SCHEDULE_CHECK(!v || !is_in_blocked()); + SCHEDULE_CHECK(!is_in_ready()); + _in_blocked.store(v, std::memory_order_release); } ScheduleToken acquire_schedule_token() { @@ -572,9 +575,10 @@ class PipelineDriver { DriverQueue* _in_queue = nullptr; // The index of QuerySharedDriverQueue._queues which this driver belongs to. size_t _driver_queue_level = 0; - std::atomic _in_ready_queue{false}; - // Indicates whether it is in a block queue. Only used in EventScheduler mode. - std::atomic _in_block_queue{false}; + // Indicates whether it is in a ready queue. + std::atomic _in_ready{false}; + // Indicates whether it is in a block states. Only used when enable event scheduler mode. + std::atomic _in_blocked{false}; std::atomic _schedule_token{true}; // Indicates if the block queue needs to be checked when it is added to the block queue. See EventScheduler for details. diff --git a/be/src/exec/pipeline/pipeline_driver_executor.cpp b/be/src/exec/pipeline/pipeline_driver_executor.cpp index 2d97575d69baf..b4da5f9d68cd8 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.cpp +++ b/be/src/exec/pipeline/pipeline_driver_executor.cpp @@ -285,7 +285,7 @@ void GlobalDriverExecutor::submit(DriverRawPtr driver) { void GlobalDriverExecutor::cancel(DriverRawPtr driver) { // if driver is already in ready queue, we should cancel it // otherwise, just ignore it and wait for the poller to schedule - if (driver->is_in_ready_queue()) { + if (driver->is_in_ready()) { this->_driver_queue->cancel(driver); } } diff --git a/be/src/exec/pipeline/pipeline_driver_queue.cpp b/be/src/exec/pipeline/pipeline_driver_queue.cpp index 6a2a96ea676bf..0caf3f69b3394 100644 --- a/be/src/exec/pipeline/pipeline_driver_queue.cpp +++ b/be/src/exec/pipeline/pipeline_driver_queue.cpp @@ -50,7 +50,7 @@ void QuerySharedDriverQueue::put_back(const DriverRawPtr driver) { { std::lock_guard lock(_global_mutex); _queues[level].put(driver); - driver->set_in_ready_queue(true); + driver->set_in_ready(true); driver->set_in_queue(this); driver->update_peak_driver_queue_size_counter(_num_drivers); _cv.notify_one(); @@ -67,7 +67,7 @@ void QuerySharedDriverQueue::put_back(const std::vector& drivers) std::lock_guard lock(_global_mutex); for (int i = 0; i < drivers.size(); i++) { _queues[levels[i]].put(drivers[i]); - drivers[i]->set_in_ready_queue(true); + drivers[i]->set_in_ready(true); drivers[i]->set_in_queue(this); drivers[i]->update_peak_driver_queue_size_counter(_num_drivers); _cv.notify_one(); @@ -117,7 +117,7 @@ StatusOr QuerySharedDriverQueue::take(const bool block) { if (queue_idx >= 0) { // record queue's index to accumulate time for it. driver_ptr = _queues[queue_idx].take(false); - driver_ptr->set_in_ready_queue(false); + driver_ptr->set_in_ready(false); --_num_drivers; } @@ -132,7 +132,7 @@ void QuerySharedDriverQueue::cancel(DriverRawPtr driver) { if (_is_closed) { return; } - if (!driver->is_in_ready_queue()) { + if (!driver->is_in_ready()) { return; } int level = driver->get_driver_queue_level(); @@ -174,7 +174,7 @@ void SubQuerySharedDriverQueue::put(const DriverRawPtr driver) { void SubQuerySharedDriverQueue::cancel(const DriverRawPtr driver) { if (cancelled_set.count(driver) == 0) { - DCHECK(driver->is_in_ready_queue()); + DCHECK(driver->is_in_ready()); pending_cancel_queue.emplace(driver); } } @@ -291,7 +291,7 @@ void WorkGroupDriverQueue::cancel(DriverRawPtr driver) { if (_is_closed) { return; } - if (!driver->is_in_ready_queue()) { + if (!driver->is_in_ready()) { return; } auto* wg_entity = driver->workgroup()->driver_sched_entity(); diff --git a/be/src/exec/pipeline/schedule/event_scheduler.cpp b/be/src/exec/pipeline/schedule/event_scheduler.cpp index 89ce66e343fe7..4575ac5877a0d 100644 --- a/be/src/exec/pipeline/schedule/event_scheduler.cpp +++ b/be/src/exec/pipeline/schedule/event_scheduler.cpp @@ -25,20 +25,20 @@ namespace starrocks::pipeline { void EventScheduler::add_blocked_driver(const DriverRawPtr driver) { // Capture query-context is needed before calling reschedule to avoid UAF auto query_ctx = driver->fragment_ctx()->runtime_state()->query_ctx()->shared_from_this(); - SCHEDULE_CHECK(!driver->is_in_block_queue()); - driver->set_in_block_queue(true); + SCHEDULE_CHECK(!driver->is_in_blocked()); + driver->set_in_blocked(true); TRACE_SCHEDULE_LOG << "TRACE add to block queue:" << driver << "," << driver->to_readable_string(); auto token = driver->acquire_schedule_token(); // The driver is ready put to block queue. but is_in_block_queue is false, but the driver is active. // set this flag to make the block queue should check the driver is active if (!token.acquired() || driver->need_check_reschedule()) { - driver->observer()->cancel_update(); + driver->observer()->cancel_trigger(); } } // For a single driver try_schedule has no concurrency. void EventScheduler::try_schedule(const DriverRawPtr driver) { - SCHEDULE_CHECK(driver->is_in_block_queue()); + SCHEDULE_CHECK(driver->is_in_blocked()); bool add_to_ready_queue = false; RACE_DETECT(driver->schedule); @@ -69,7 +69,7 @@ void EventScheduler::try_schedule(const DriverRawPtr driver) { if (add_to_ready_queue) { TRACE_SCHEDULE_LOG << "TRACE schedule driver:" << driver << " to ready queue"; driver->set_need_check_reschedule(false); - driver->set_in_block_queue(false); + driver->set_in_blocked(false); _driver_queue->put_back(driver); } } diff --git a/be/src/exec/pipeline/schedule/observer.cpp b/be/src/exec/pipeline/schedule/observer.cpp index c20018ee2fa4e..a19b9a432ec7f 100644 --- a/be/src/exec/pipeline/schedule/observer.cpp +++ b/be/src/exec/pipeline/schedule/observer.cpp @@ -22,8 +22,7 @@ static void on_update(PipelineDriver* driver) { auto sink = driver->sink_operator(); auto source = driver->source_operator(); TRACE_SCHEDULE_LOG << "notify driver:" << driver << " state:" << driver->driver_state() - << " in_block_queue:" << driver->is_in_block_queue() - << " source finished:" << source->is_finished() + << " in_block_queue:" << driver->is_in_blocked() << " source finished:" << source->is_finished() << " operator has output:" << source->has_output() << " sink finished:" << sink->is_finished() << " sink need input:" << sink->need_input() << ":" << driver->to_readable_string(); if (sink->is_finished() || sink->need_input() || source->is_finished() || source->has_output()) { @@ -49,7 +48,9 @@ void PipelineObserver::_do_update(int event) { auto driver = _driver; auto token = driver->acquire_schedule_token(); - if (driver->is_in_block_queue()) { + if (driver->is_in_blocked()) { + // In PRECONDITION state, has_output need_input may return false. In this case, + // we need to schedule the driver to INPUT_EMPTY/OUTPUT_FULL state. bool pipeline_block = driver->driver_state() != DriverState::INPUT_EMPTY || driver->driver_state() != DriverState::OUTPUT_FULL; if (pipeline_block || _is_cancel_changed(event)) { diff --git a/be/src/exec/pipeline/schedule/observer.h b/be/src/exec/pipeline/schedule/observer.h index a448da2ada2df..025e89a2847e4 100644 --- a/be/src/exec/pipeline/schedule/observer.h +++ b/be/src/exec/pipeline/schedule/observer.h @@ -39,22 +39,22 @@ class PipelineObserver { DISALLOW_COPY_AND_MOVE(PipelineObserver); - void source_update() { + void source_trigger() { _active_event(SOURCE_CHANGE_EVENT); _update([this](int event) { _do_update(event); }); } - void sink_update() { + void sink_trigger() { _active_event(SINK_CHANGE_EVENT); _update([this](int event) { _do_update(event); }); } - void cancel_update() { + void cancel_trigger() { _active_event(CANCEL_EVENT); _update([this](int event) { _do_update(event); }); } - void all_update() { + void all_trigger() { _active_event(SOURCE_CHANGE_EVENT | SINK_CHANGE_EVENT); _update([this](int event) { _do_update(event); }); } @@ -111,12 +111,12 @@ class Observable { void notify_source_observers() { for (auto* observer : _observers) { - observer->source_update(); + observer->source_trigger(); } } void notify_sink_observers() { for (auto* observer : _observers) { - observer->sink_update(); + observer->sink_trigger(); } } diff --git a/be/test/exec/pipeline/schedule/observer_test.cpp b/be/test/exec/pipeline/schedule/observer_test.cpp index 3414d85749abc..838c41aa2181c 100644 --- a/be/test/exec/pipeline/schedule/observer_test.cpp +++ b/be/test/exec/pipeline/schedule/observer_test.cpp @@ -212,17 +212,17 @@ TEST_F(PipelineObserverTest, basic_test) { const auto& driver = tx.driver; const auto& driver_queue = tx.driver_queue; - driver->set_in_block_queue(true); + driver->set_in_blocked(true); driver->set_driver_state(DriverState::INPUT_EMPTY); // test notify - driver->observer()->all_update(); + driver->observer()->all_trigger(); ASSERT_OK(driver_queue->take(false)); - driver->observer()->cancel_update(); + driver->observer()->cancel_trigger(); ASSERT_OK(driver_queue->take(false)); - driver->observer()->sink_update(); + driver->observer()->sink_trigger(); ASSERT_OK(driver_queue->take(false)); - driver->observer()->source_update(); - driver->observer()->source_update(); + driver->observer()->source_trigger(); + driver->observer()->source_trigger(); } TEST_F(PipelineObserverTest, test_obs) { @@ -233,7 +233,7 @@ TEST_F(PipelineObserverTest, test_obs) { ASSERT_OK(tx.driver->prepare(_runtime_state.get())); const auto& driver = tx.driver; - driver->set_in_block_queue(true); + driver->set_in_blocked(true); driver->set_driver_state(DriverState::PENDING_FINISH); Observable obs; _runtime_state->set_enable_event_scheduler(true); @@ -254,8 +254,8 @@ TEST_F(PipelineObserverTest, test_cancel) { driver->set_driver_state(DriverState::INPUT_EMPTY); _dummy_fragment_ctx->cancel(Status::InternalError("error")); - driver->set_in_block_queue(true); - driver->observer()->all_update(); + driver->set_in_blocked(true); + driver->observer()->all_trigger(); for (size_t i = 0; i < driver->_operator_stages.size(); ++i) { driver->_operator_stages[i] = OperatorStage::CLOSED; }