Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Dec 25, 2024
1 parent 2fb8a1e commit d6b4d28
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 40 deletions.
28 changes: 16 additions & 12 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ class PipelineDriver {
friend class PipelineDriverPoller;

public:
// used in event scheduler
// If event_scheduler doesn't get the token, it proves that the observer has entered the critical zone,
// and we don't know the real state of the driver, so we need to try scheduling the driver one more time.
class ScheduleToken {
public:
ScheduleToken(DriverRawPtr driver, bool acquired) : _driver(driver), _acquired(acquired) {}
Expand Down Expand Up @@ -462,17 +465,17 @@ class PipelineDriver {
size_t get_driver_queue_level() const { return _driver_queue_level; }
void set_driver_queue_level(size_t driver_queue_level) { _driver_queue_level = driver_queue_level; }

inline bool is_in_ready_queue() const { return _in_ready_queue.load(std::memory_order_acquire); }
void set_in_ready_queue(bool v) {
SCHEDULE_CHECK(!v || !is_in_ready_queue());
_in_ready_queue.store(v, std::memory_order_release);
inline bool is_in_ready() const { return _in_ready.load(std::memory_order_acquire); }
void set_in_ready(bool v) {
SCHEDULE_CHECK(!v || !is_in_ready());
_in_ready.store(v, std::memory_order_release);
}

bool is_in_block_queue() const { return _in_block_queue.load(std::memory_order_acquire); }
void set_in_block_queue(bool v) {
SCHEDULE_CHECK(!v || !is_in_block_queue());
SCHEDULE_CHECK(!is_in_ready_queue());
_in_block_queue.store(v, std::memory_order_release);
bool is_in_blocked() const { return _in_blocked.load(std::memory_order_acquire); }
void set_in_blocked(bool v) {
SCHEDULE_CHECK(!v || !is_in_blocked());
SCHEDULE_CHECK(!is_in_ready());
_in_blocked.store(v, std::memory_order_release);
}

ScheduleToken acquire_schedule_token() {
Expand Down Expand Up @@ -572,9 +575,10 @@ class PipelineDriver {
DriverQueue* _in_queue = nullptr;
// The index of QuerySharedDriverQueue._queues which this driver belongs to.
size_t _driver_queue_level = 0;
std::atomic<bool> _in_ready_queue{false};
// Indicates whether it is in a block queue. Only used in EventScheduler mode.
std::atomic<bool> _in_block_queue{false};
// Indicates whether it is in a ready queue.
std::atomic<bool> _in_ready{false};
// Indicates whether it is in a block states. Only used when enable event scheduler mode.
std::atomic<bool> _in_blocked{false};

std::atomic<bool> _schedule_token{true};
// Indicates if the block queue needs to be checked when it is added to the block queue. See EventScheduler for details.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ void GlobalDriverExecutor::submit(DriverRawPtr driver) {
void GlobalDriverExecutor::cancel(DriverRawPtr driver) {
// if driver is already in ready queue, we should cancel it
// otherwise, just ignore it and wait for the poller to schedule
if (driver->is_in_ready_queue()) {
if (driver->is_in_ready()) {
this->_driver_queue->cancel(driver);
}
}
Expand Down
12 changes: 6 additions & 6 deletions be/src/exec/pipeline/pipeline_driver_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void QuerySharedDriverQueue::put_back(const DriverRawPtr driver) {
{
std::lock_guard<std::mutex> lock(_global_mutex);
_queues[level].put(driver);
driver->set_in_ready_queue(true);
driver->set_in_ready(true);
driver->set_in_queue(this);
driver->update_peak_driver_queue_size_counter(_num_drivers);
_cv.notify_one();
Expand All @@ -67,7 +67,7 @@ void QuerySharedDriverQueue::put_back(const std::vector<DriverRawPtr>& drivers)
std::lock_guard<std::mutex> lock(_global_mutex);
for (int i = 0; i < drivers.size(); i++) {
_queues[levels[i]].put(drivers[i]);
drivers[i]->set_in_ready_queue(true);
drivers[i]->set_in_ready(true);
drivers[i]->set_in_queue(this);
drivers[i]->update_peak_driver_queue_size_counter(_num_drivers);
_cv.notify_one();
Expand Down Expand Up @@ -117,7 +117,7 @@ StatusOr<DriverRawPtr> QuerySharedDriverQueue::take(const bool block) {
if (queue_idx >= 0) {
// record queue's index to accumulate time for it.
driver_ptr = _queues[queue_idx].take(false);
driver_ptr->set_in_ready_queue(false);
driver_ptr->set_in_ready(false);

--_num_drivers;
}
Expand All @@ -132,7 +132,7 @@ void QuerySharedDriverQueue::cancel(DriverRawPtr driver) {
if (_is_closed) {
return;
}
if (!driver->is_in_ready_queue()) {
if (!driver->is_in_ready()) {
return;
}
int level = driver->get_driver_queue_level();
Expand Down Expand Up @@ -174,7 +174,7 @@ void SubQuerySharedDriverQueue::put(const DriverRawPtr driver) {

void SubQuerySharedDriverQueue::cancel(const DriverRawPtr driver) {
if (cancelled_set.count(driver) == 0) {
DCHECK(driver->is_in_ready_queue());
DCHECK(driver->is_in_ready());
pending_cancel_queue.emplace(driver);
}
}
Expand Down Expand Up @@ -291,7 +291,7 @@ void WorkGroupDriverQueue::cancel(DriverRawPtr driver) {
if (_is_closed) {
return;
}
if (!driver->is_in_ready_queue()) {
if (!driver->is_in_ready()) {
return;
}
auto* wg_entity = driver->workgroup()->driver_sched_entity();
Expand Down
10 changes: 5 additions & 5 deletions be/src/exec/pipeline/schedule/event_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ namespace starrocks::pipeline {
void EventScheduler::add_blocked_driver(const DriverRawPtr driver) {
// Capture query-context is needed before calling reschedule to avoid UAF
auto query_ctx = driver->fragment_ctx()->runtime_state()->query_ctx()->shared_from_this();
SCHEDULE_CHECK(!driver->is_in_block_queue());
driver->set_in_block_queue(true);
SCHEDULE_CHECK(!driver->is_in_blocked());
driver->set_in_blocked(true);
TRACE_SCHEDULE_LOG << "TRACE add to block queue:" << driver << "," << driver->to_readable_string();
auto token = driver->acquire_schedule_token();
// The driver is ready put to block queue. but is_in_block_queue is false, but the driver is active.
// set this flag to make the block queue should check the driver is active
if (!token.acquired() || driver->need_check_reschedule()) {
driver->observer()->cancel_update();
driver->observer()->cancel_trigger();
}
}

// For a single driver try_schedule has no concurrency.
void EventScheduler::try_schedule(const DriverRawPtr driver) {
SCHEDULE_CHECK(driver->is_in_block_queue());
SCHEDULE_CHECK(driver->is_in_blocked());
bool add_to_ready_queue = false;
RACE_DETECT(driver->schedule);

Expand Down Expand Up @@ -69,7 +69,7 @@ void EventScheduler::try_schedule(const DriverRawPtr driver) {
if (add_to_ready_queue) {
TRACE_SCHEDULE_LOG << "TRACE schedule driver:" << driver << " to ready queue";
driver->set_need_check_reschedule(false);
driver->set_in_block_queue(false);
driver->set_in_blocked(false);
_driver_queue->put_back(driver);
}
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/pipeline/schedule/observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ void PipelineObserver::_do_update(int event) {
auto driver = _driver;
auto token = driver->acquire_schedule_token();

if (driver->is_in_block_queue()) {
if (driver->is_in_blocked()) {
// In PRECONDITION state, has_output need_input may return false. In this case,
// we need to schedule the driver to INPUT_EMPTY/OUTPUT_FULL state.
bool pipeline_block = driver->driver_state() != DriverState::INPUT_EMPTY ||
driver->driver_state() != DriverState::OUTPUT_FULL;
if (pipeline_block || _is_cancel_changed(event)) {
Expand Down
12 changes: 6 additions & 6 deletions be/src/exec/pipeline/schedule/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,22 @@ class PipelineObserver {

DISALLOW_COPY_AND_MOVE(PipelineObserver);

void source_update() {
void source_trigger() {
_active_event(SOURCE_CHANGE_EVENT);
_update([this](int event) { _do_update(event); });
}

void sink_update() {
void sink_trigger() {
_active_event(SINK_CHANGE_EVENT);
_update([this](int event) { _do_update(event); });
}

void cancel_update() {
void cancel_trigger() {
_active_event(CANCEL_EVENT);
_update([this](int event) { _do_update(event); });
}

void all_update() {
void all_trigger() {
_active_event(SOURCE_CHANGE_EVENT | SINK_CHANGE_EVENT);
_update([this](int event) { _do_update(event); });
}
Expand Down Expand Up @@ -111,12 +111,12 @@ class Observable {

void notify_source_observers() {
for (auto* observer : _observers) {
observer->source_update();
observer->source_trigger();
}
}
void notify_sink_observers() {
for (auto* observer : _observers) {
observer->sink_update();
observer->sink_trigger();
}
}

Expand Down
18 changes: 9 additions & 9 deletions be/test/exec/pipeline/schedule/observer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,17 @@ TEST_F(PipelineObserverTest, basic_test) {
const auto& driver = tx.driver;
const auto& driver_queue = tx.driver_queue;

driver->set_in_block_queue(true);
driver->set_in_blocked(true);
driver->set_driver_state(DriverState::INPUT_EMPTY);
// test notify
driver->observer()->all_update();
driver->observer()->all_trigger();
ASSERT_OK(driver_queue->take(false));
driver->observer()->cancel_update();
driver->observer()->cancel_trigger();
ASSERT_OK(driver_queue->take(false));
driver->observer()->sink_update();
driver->observer()->sink_trigger();
ASSERT_OK(driver_queue->take(false));
driver->observer()->source_update();
driver->observer()->source_update();
driver->observer()->source_trigger();
driver->observer()->source_trigger();
}

TEST_F(PipelineObserverTest, test_obs) {
Expand All @@ -233,7 +233,7 @@ TEST_F(PipelineObserverTest, test_obs) {
ASSERT_OK(tx.driver->prepare(_runtime_state.get()));
const auto& driver = tx.driver;

driver->set_in_block_queue(true);
driver->set_in_blocked(true);
driver->set_driver_state(DriverState::PENDING_FINISH);
Observable obs;
_runtime_state->set_enable_event_scheduler(true);
Expand All @@ -254,8 +254,8 @@ TEST_F(PipelineObserverTest, test_cancel) {

driver->set_driver_state(DriverState::INPUT_EMPTY);
_dummy_fragment_ctx->cancel(Status::InternalError("error"));
driver->set_in_block_queue(true);
driver->observer()->all_update();
driver->set_in_blocked(true);
driver->observer()->all_trigger();
for (size_t i = 0; i < driver->_operator_stages.size(); ++i) {
driver->_operator_stages[i] = OperatorStage::CLOSED;
}
Expand Down

0 comments on commit d6b4d28

Please sign in to comment.