Skip to content

Commit

Permalink
[Feature] support phased scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Dec 24, 2024
1 parent 33da390 commit 4eb5909
Show file tree
Hide file tree
Showing 20 changed files with 868 additions and 1 deletion.
4 changes: 4 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ set(EXEC_FILES
pipeline/group_execution/execution_group.cpp
pipeline/group_execution/execution_group_builder.cpp
pipeline/group_execution/group_operator.cpp
pipeline/schedule/event_scheduler.cpp
pipeline/schedule/observer.cpp
pipeline/schedule/pipeline_timer.cpp
pipeline/schedule/timeout_tasks.cpp
workgroup/pipeline_executor_set.cpp
workgroup/pipeline_executor_set_manager.cpp
workgroup/work_group.cpp
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/pipeline/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,10 @@ void FragmentContext::_close_stream_load_contexts() {
}
}

void FragmentContext::init_event_scheduler() {
_event_scheduler = std::make_unique<EventScheduler>();
runtime_state()->runtime_profile()->add_info_string("EnableEventScheduler",
enable_event_scheduler() ? "true" : "false");
}

} // namespace starrocks::pipeline
6 changes: 6 additions & 0 deletions be/src/exec/pipeline/fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ class FragmentContext {
// acquire runtime filter from cache
void acquire_runtime_filters();

bool enable_event_scheduler() const { return event_scheduler() != nullptr; }
EventScheduler* event_scheduler() const { return _event_scheduler.get(); }
void init_event_scheduler();

private:
void _close_stream_load_contexts();

Expand Down Expand Up @@ -235,6 +239,8 @@ class FragmentContext {
RuntimeProfile::Counter* _jit_timer = nullptr;

bool _report_when_finish{};

std::unique_ptr<EventScheduler> _event_scheduler;
};

class FragmentContextManager {
Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/pipeline/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "column/vectorized_fwd.h"
#include "common/statusor.h"
#include "exec/pipeline/runtime_filter_types.h"
#include "exec/pipeline/schedule/observer.h"
#include "exec/spill/operator_mem_resource_manager.h"
#include "exprs/runtime_filter_bank.h"
#include "gutil/strings/substitute.h"
Expand Down Expand Up @@ -269,6 +270,9 @@ class Operator {

virtual void update_exec_stats(RuntimeState* state);

void set_observer(PipelineObserver* observer) { _observer = observer; }
PipelineObserver* observer() const { return _observer; }

protected:
OperatorFactory* _factory;
const int32_t _id;
Expand Down Expand Up @@ -323,6 +327,8 @@ class Operator {
// such as OlapScanOperator( use separated IO thread to execute the IO task)
std::atomic_int64_t _last_growth_cpu_time_ns = 0;

PipelineObserver* _observer = nullptr;

private:
void _init_rf_counters(bool init_bloom);
void _init_conjuct_counters();
Expand Down Expand Up @@ -408,6 +414,8 @@ class OperatorFactory {
// try to get runtime filter from cache
void acquire_runtime_filter(RuntimeState* state);

virtual bool support_event_scheduler() const { return false; }

protected:
void _prepare_runtime_in_filters(RuntimeState* state);
void _prepare_runtime_holders(const std::vector<RuntimeFilterHolder*>& holders,
Expand Down
19 changes: 19 additions & 0 deletions be/src/exec/pipeline/schedule/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#define TRACE_SCHEDULE_LOG VLOG(10)

#define SCHEDULE_CHECK CHECK
87 changes: 87 additions & 0 deletions be/src/exec/pipeline/schedule/event_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/schedule/event_scheduler.h"

#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/pipeline_driver_queue.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/schedule/common.h"
#include "exec/pipeline/schedule/utils.h"

namespace starrocks::pipeline {

void EventScheduler::add_blocked_driver(const DriverRawPtr driver) {
// Capture query-context is needed before calling reschedule to avoid UAF
auto query_ctx = driver->fragment_ctx()->runtime_state()->query_ctx()->shared_from_this();
SCHEDULE_CHECK(!driver->is_in_block_queue());
driver->set_in_block_queue(true);
TRACE_SCHEDULE_LOG << "TRACE add to block queue:" << driver << "," << driver->to_readable_string();
auto token = driver->acquire_schedule_token();
// The driver is ready put to block queue. but is_in_block_queue is false, but the driver is active.
// set this flag to make the block queue should check the driver is active
if (!token.acquired() || driver->need_check_reschedule()) {
driver->observer()->cancel_update();
}
}

// For a single driver try_schedule has no concurrency.
void EventScheduler::try_schedule(const DriverRawPtr driver) {
SCHEDULE_CHECK(driver->is_in_block_queue());
bool add_to_ready_queue = false;
RACE_DETECT(driver->schedule);

// The logic in the pipeline poller is basically the same.
auto fragment_ctx = driver->fragment_ctx();
if (fragment_ctx->is_canceled()) {
add_to_ready_queue = on_cancel(driver);
} else if (driver->need_report_exec_state()) {
add_to_ready_queue = true;
} else if (driver->pending_finish()) {
if (!driver->is_still_pending_finish()) {
driver->set_driver_state(fragment_ctx->is_canceled() ? DriverState::CANCELED : DriverState::FINISH);
add_to_ready_queue = true;
}
} else if (driver->is_finished()) {
add_to_ready_queue = true;
} else {
auto status_or_is_not_blocked = driver->is_not_blocked();
if (!status_or_is_not_blocked.ok()) {
fragment_ctx->cancel(status_or_is_not_blocked.status());
add_to_ready_queue = on_cancel(driver);
} else if (status_or_is_not_blocked.value()) {
driver->set_driver_state(DriverState::READY);
add_to_ready_queue = true;
}
}

if (add_to_ready_queue) {
TRACE_SCHEDULE_LOG << "TRACE schedule driver:" << driver << " to ready queue";
driver->set_need_check_reschedule(false);
driver->set_in_block_queue(false);
_driver_queue->put_back(driver);
}
}

bool EventScheduler::on_cancel(DriverRawPtr driver) {
driver->cancel_operators(driver->fragment_ctx()->runtime_state());
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
return false;
} else {
driver->set_driver_state(DriverState::CANCELED);
return true;
}
}
} // namespace starrocks::pipeline
42 changes: 42 additions & 0 deletions be/src/exec/pipeline/schedule/event_scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/pipeline/pipeline_fwd.h"
#include "gutil/macros.h"

namespace starrocks::pipeline {
class DriverQueue;
class EventScheduler {
public:
EventScheduler() = default;
DISALLOW_COPY(EventScheduler);

void add_blocked_driver(const DriverRawPtr driver);

void try_schedule(const DriverRawPtr driver);

bool on_cancel(DriverRawPtr driver);

void attach_queue(DriverQueue* queue) {
if (_driver_queue == nullptr) {
_driver_queue = queue;
}
}

private:
DriverQueue* _driver_queue = nullptr;
};
} // namespace starrocks::pipeline
79 changes: 79 additions & 0 deletions be/src/exec/pipeline/schedule/observer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/schedule/observer.h"

#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/schedule/common.h"

namespace starrocks::pipeline {
static void on_update(PipelineDriver* driver) {
auto sink = driver->sink_operator();
auto source = driver->source_operator();
TRACE_SCHEDULE_LOG << "notify driver:" << driver << " state:" << driver->driver_state()
<< " in_block_queue:" << driver->is_in_block_queue()
<< " source finished:" << source->is_finished()
<< " operator has output:" << source->has_output() << " sink finished:" << sink->is_finished()
<< " sink need input:" << sink->need_input() << ":" << driver->to_readable_string();
if (sink->is_finished() || sink->need_input() || source->is_finished() || source->has_output()) {
driver->fragment_ctx()->event_scheduler()->try_schedule(driver);
}
}

static void on_sink_update(PipelineDriver* driver) {
auto sink = driver->sink_operator();
if (sink->is_finished() || sink->need_input()) {
driver->fragment_ctx()->event_scheduler()->try_schedule(driver);
}
}

static void on_source_update(PipelineDriver* driver) {
auto source = driver->source_operator();
if (source->is_finished() || source->has_output()) {
driver->fragment_ctx()->event_scheduler()->try_schedule(driver);
}
}

void PipelineObserver::_do_update(int event) {
auto driver = _driver;
auto token = driver->acquire_schedule_token();

if (driver->is_in_block_queue()) {
bool pipeline_block = driver->driver_state() != DriverState::INPUT_EMPTY ||
driver->driver_state() != DriverState::OUTPUT_FULL;
if (pipeline_block || _is_cancel_changed(event)) {
driver->fragment_ctx()->event_scheduler()->try_schedule(driver);
} else if (_is_all_changed(event)) {
on_update(driver);
} else if (_is_source_changed(event)) {
on_source_update(driver);
} else if (_is_sink_changed(event)) {
on_sink_update(driver);
} else {
// nothing to do
}
} else {
driver->set_need_check_reschedule(true);
}
}

std::string Observable::to_string() const {
std::string str;
for (auto* observer : _observers) {
str += observer->driver()->to_readable_string() + "\n";
}
return str;
}

} // namespace starrocks::pipeline
Loading

0 comments on commit 4eb5909

Please sign in to comment.