Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] support event scheduler (part 1) #54260

Merged
merged 10 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ set(EXEC_FILES
pipeline/group_execution/execution_group.cpp
pipeline/group_execution/execution_group_builder.cpp
pipeline/group_execution/group_operator.cpp
pipeline/schedule/event_scheduler.cpp
pipeline/schedule/observer.cpp
pipeline/schedule/pipeline_timer.cpp
workgroup/pipeline_executor_set.cpp
workgroup/pipeline_executor_set_manager.cpp
workgroup/work_group.cpp
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/empty_set_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class EmptySetOperatorFactory final : public SourceOperatorFactory {

~EmptySetOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<EmptySetOperator>(this, _id, _plan_node_id, driver_sequence);
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/pipeline/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,10 @@ void FragmentContext::_close_stream_load_contexts() {
}
}

void FragmentContext::init_event_scheduler() {
_event_scheduler = std::make_unique<EventScheduler>();
runtime_state()->runtime_profile()->add_info_string("EnableEventScheduler",
enable_event_scheduler() ? "true" : "false");
}

} // namespace starrocks::pipeline
7 changes: 7 additions & 0 deletions be/src/exec/pipeline/fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/runtime_filter_types.h"
#include "exec/pipeline/scan/morsel.h"
#include "exec/pipeline/schedule/event_scheduler.h"
#include "exec/query_cache/cache_param.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService.h"
Expand Down Expand Up @@ -174,6 +175,10 @@ class FragmentContext {
// acquire runtime filter from cache
void acquire_runtime_filters();

bool enable_event_scheduler() const { return event_scheduler() != nullptr; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall it be event_scheduler_enabled? enable_event_scheduler reads like to perform an action, turn on the event_scheduler, not query the on/off status.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our session variables style is enable_xxx_feature

EventScheduler* event_scheduler() const { return _event_scheduler.get(); }
void init_event_scheduler();

private:
void _close_stream_load_contexts();

Expand Down Expand Up @@ -235,6 +240,8 @@ class FragmentContext {
RuntimeProfile::Counter* _jit_timer = nullptr;

bool _report_when_finish{};

std::unique_ptr<EventScheduler> _event_scheduler;
};

class FragmentContextManager {
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/noop_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class NoopSinkOperatorFactory final : public OperatorFactory {

~NoopSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<NoopSinkOperator>(this, _id, _plan_node_id, driver_sequence);
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/pipeline/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "column/vectorized_fwd.h"
#include "common/statusor.h"
#include "exec/pipeline/runtime_filter_types.h"
#include "exec/pipeline/schedule/observer.h"
#include "exec/spill/operator_mem_resource_manager.h"
#include "exprs/runtime_filter_bank.h"
#include "gutil/strings/substitute.h"
Expand Down Expand Up @@ -269,6 +270,9 @@ class Operator {

virtual void update_exec_stats(RuntimeState* state);

void set_observer(PipelineObserver* observer) { _observer = observer; }
PipelineObserver* observer() const { return _observer; }

protected:
OperatorFactory* _factory;
const int32_t _id;
Expand Down Expand Up @@ -323,6 +327,8 @@ class Operator {
// such as OlapScanOperator( use separated IO thread to execute the IO task)
std::atomic_int64_t _last_growth_cpu_time_ns = 0;

PipelineObserver* _observer = nullptr;

private:
void _init_rf_counters(bool init_bloom);
void _init_conjuct_counters();
Expand Down Expand Up @@ -408,6 +414,8 @@ class OperatorFactory {
// try to get runtime filter from cache
void acquire_runtime_filter(RuntimeState* state);

virtual bool support_event_scheduler() const { return false; }

protected:
void _prepare_runtime_in_filters(RuntimeState* state);
void _prepare_runtime_holders(const std::vector<RuntimeFilterHolder*>& holders,
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ Status PipelineDriver::prepare(RuntimeState* runtime_state) {
size_t subscribe_filter_sequence = source_op->get_driver_sequence();
_local_rf_holders =
fragment_ctx()->runtime_filter_hub()->gather_holders(all_local_rf_set, subscribe_filter_sequence);

if (use_cache) {
ssize_t cache_op_idx = -1;
query_cache::CacheOperatorPtr cache_op = nullptr;
Expand Down Expand Up @@ -904,4 +905,10 @@ void PipelineDriver::increment_schedule_times() {
driver_acct().increment_schedule_times();
}

void PipelineDriver::assign_observer() {
for (const auto& op : _operators) {
op->set_observer(&_observer);
stdpain marked this conversation as resolved.
Show resolved Hide resolved
}
}

} // namespace starrocks::pipeline
62 changes: 58 additions & 4 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "exec/pipeline/runtime_filter_types.h"
#include "exec/pipeline/scan/morsel.h"
#include "exec/pipeline/scan/scan_operator.h"
#include "exec/pipeline/schedule/common.h"
#include "exec/pipeline/schedule/observer.h"
#include "exec/pipeline/source_operator.h"
#include "exec/workgroup/work_group_fwd.h"
#include "exprs/runtime_filter_bank.h"
Expand Down Expand Up @@ -195,6 +197,26 @@ enum OperatorStage {
class PipelineDriver {
friend class PipelineDriverPoller;

public:
class ScheduleToken {
public:
ScheduleToken(DriverRawPtr driver, bool acquired) : _driver(driver), _acquired(acquired) {}
~ScheduleToken() {
if (_acquired) {
_driver->_schedule_token = true;
}
}

ScheduleToken(const ScheduleToken&) = delete;
void operator=(const ScheduleToken&) = delete;

bool acquired() const { return _acquired; }

private:
DriverRawPtr _driver;
bool _acquired;
};

public:
PipelineDriver(const Operators& operators, QueryContext* query_ctx, FragmentContext* fragment_ctx,
Pipeline* pipeline, int32_t driver_id)
Expand All @@ -203,11 +225,13 @@ class PipelineDriver {
_fragment_ctx(fragment_ctx),
_pipeline(pipeline),
_source_node_id(operators[0]->get_plan_node_id()),
_driver_id(driver_id) {
_driver_id(driver_id),
_observer(this) {
_runtime_profile = std::make_shared<RuntimeProfile>(strings::Substitute("PipelineDriver (id=$0)", _driver_id));
for (auto& op : _operators) {
_operator_stages[op->get_id()] = OperatorStage::INIT;
}

_driver_name = fmt::sprintf("driver_%d_%d", _source_node_id, _driver_id);
}

Expand Down Expand Up @@ -336,7 +360,6 @@ class PipelineDriver {
if (_all_global_rf_ready_or_timeout) {
return false;
}

_all_global_rf_ready_or_timeout =
_precondition_block_timer_sw->elapsed_time() >= _global_rf_wait_timeout_ns || // Timeout,
std::all_of(_global_rf_descriptors.begin(), _global_rf_descriptors.end(), [](auto* rf_desc) {
Expand Down Expand Up @@ -440,7 +463,27 @@ class PipelineDriver {
void set_driver_queue_level(size_t driver_queue_level) { _driver_queue_level = driver_queue_level; }

inline bool is_in_ready_queue() const { return _in_ready_queue.load(std::memory_order_acquire); }
void set_in_ready_queue(bool v) { _in_ready_queue.store(v, std::memory_order_release); }
void set_in_ready_queue(bool v) {
SCHEDULE_CHECK(!v || !is_in_ready_queue());
_in_ready_queue.store(v, std::memory_order_release);
}

bool is_in_block_queue() const { return _in_block_queue.load(std::memory_order_acquire); }
void set_in_block_queue(bool v) {
SCHEDULE_CHECK(!v || !is_in_block_queue());
SCHEDULE_CHECK(!is_in_ready_queue());
_in_block_queue.store(v, std::memory_order_release);
}

ScheduleToken acquire_schedule_token() {
bool val = false;
return {this, _schedule_token.compare_exchange_strong(val, true)};
stdpain marked this conversation as resolved.
Show resolved Hide resolved
}

DECLARE_RACE_DETECTOR(schedule)

bool need_check_reschedule() const { return _need_check_reschedule; }
void set_need_check_reschedule(bool need_reschedule) { _need_check_reschedule = need_reschedule; }

inline std::string get_name() const { return strings::Substitute("PipelineDriver (id=$0)", _driver_id); }

Expand All @@ -456,14 +499,18 @@ class PipelineDriver {
return source_operator()->is_epoch_finishing() || sink_operator()->is_epoch_finishing();
}

PipelineObserver* observer() { return &_observer; }
void assign_observer();

protected:
PipelineDriver()
: _operators(),
_query_ctx(nullptr),
_fragment_ctx(nullptr),
_pipeline(nullptr),
_source_node_id(0),
_driver_id(0) {}
_driver_id(0),
_observer(this) {}

// Yield PipelineDriver when maximum time in nano-seconds has spent in current execution round.
static constexpr int64_t YIELD_MAX_TIME_SPENT_NS = 100'000'000L;
Expand Down Expand Up @@ -526,9 +573,16 @@ class PipelineDriver {
// The index of QuerySharedDriverQueue._queues which this driver belongs to.
size_t _driver_queue_level = 0;
std::atomic<bool> _in_ready_queue{false};
// Indicates whether it is in a block queue. Only used in EventScheduler mode.
std::atomic<bool> _in_block_queue{false};

std::atomic<bool> _schedule_token{true};
// Indicates if the block queue needs to be checked when it is added to the block queue. See EventScheduler for details.
std::atomic<bool> _need_check_reschedule{false};

std::atomic<bool> _has_log_cancelled{false};

PipelineObserver _observer;
// metrics
RuntimeProfile::Counter* _total_timer = nullptr;
RuntimeProfile::Counter* _active_timer = nullptr;
Expand Down
19 changes: 19 additions & 0 deletions be/src/exec/pipeline/schedule/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#define TRACE_SCHEDULE_LOG VLOG(10)

#define SCHEDULE_CHECK CHECK
87 changes: 87 additions & 0 deletions be/src/exec/pipeline/schedule/event_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/schedule/event_scheduler.h"

#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/pipeline_driver_queue.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/schedule/common.h"
#include "exec/pipeline/schedule/utils.h"

namespace starrocks::pipeline {

void EventScheduler::add_blocked_driver(const DriverRawPtr driver) {
// Capture query-context is needed before calling reschedule to avoid UAF
auto query_ctx = driver->fragment_ctx()->runtime_state()->query_ctx()->shared_from_this();
SCHEDULE_CHECK(!driver->is_in_block_queue());
driver->set_in_block_queue(true);
TRACE_SCHEDULE_LOG << "TRACE add to block queue:" << driver << "," << driver->to_readable_string();
auto token = driver->acquire_schedule_token();
// The driver is ready put to block queue. but is_in_block_queue is false, but the driver is active.
// set this flag to make the block queue should check the driver is active
if (!token.acquired() || driver->need_check_reschedule()) {
driver->observer()->cancel_update();
}
}

// For a single driver try_schedule has no concurrency.
void EventScheduler::try_schedule(const DriverRawPtr driver) {
SCHEDULE_CHECK(driver->is_in_block_queue());
bool add_to_ready_queue = false;
RACE_DETECT(driver->schedule);

// The logic in the pipeline poller is basically the same.
auto fragment_ctx = driver->fragment_ctx();
if (fragment_ctx->is_canceled()) {
add_to_ready_queue = on_cancel(driver);
} else if (driver->need_report_exec_state()) {
add_to_ready_queue = true;
} else if (driver->pending_finish()) {
if (!driver->is_still_pending_finish()) {
driver->set_driver_state(fragment_ctx->is_canceled() ? DriverState::CANCELED : DriverState::FINISH);
add_to_ready_queue = true;
}
} else if (driver->is_finished()) {
add_to_ready_queue = true;
} else {
auto status_or_is_not_blocked = driver->is_not_blocked();
if (!status_or_is_not_blocked.ok()) {
fragment_ctx->cancel(status_or_is_not_blocked.status());
add_to_ready_queue = on_cancel(driver);
} else if (status_or_is_not_blocked.value()) {
driver->set_driver_state(DriverState::READY);
add_to_ready_queue = true;
}
}

if (add_to_ready_queue) {
TRACE_SCHEDULE_LOG << "TRACE schedule driver:" << driver << " to ready queue";
driver->set_need_check_reschedule(false);
driver->set_in_block_queue(false);
stdpain marked this conversation as resolved.
Show resolved Hide resolved
_driver_queue->put_back(driver);
}
}

bool EventScheduler::on_cancel(DriverRawPtr driver) {
driver->cancel_operators(driver->fragment_ctx()->runtime_state());
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
return false;
} else {
driver->set_driver_state(DriverState::CANCELED);
return true;
}
}
} // namespace starrocks::pipeline
42 changes: 42 additions & 0 deletions be/src/exec/pipeline/schedule/event_scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/pipeline/pipeline_fwd.h"
#include "gutil/macros.h"

namespace starrocks::pipeline {
class DriverQueue;
class EventScheduler {
public:
EventScheduler() = default;
DISALLOW_COPY(EventScheduler);

void add_blocked_driver(const DriverRawPtr driver);

void try_schedule(const DriverRawPtr driver);

bool on_cancel(DriverRawPtr driver);

void attach_queue(DriverQueue* queue) {
if (_driver_queue == nullptr) {
_driver_queue = queue;
}
}

private:
DriverQueue* _driver_queue = nullptr;
};
} // namespace starrocks::pipeline
Loading
Loading