-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][Enhancement] Support poller event #49972
Conversation
@@ -227,6 +320,7 @@ size_t PipelineDriverPoller::calculate_parked_driver(const ImmutableDriverPredic | |||
} | |||
|
|||
void PipelineDriverPoller::remove_blocked_driver(DriverList& local_blocked_drivers, DriverList::iterator& driver_it) { | |||
_ready_count++; | |||
auto& driver = *driver_it; | |||
driver->_pending_timer->update(driver->_pending_timer_sw->elapsed_time()); | |||
local_blocked_drivers.erase(driver_it++); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
The while loop in backup_run_internal
does not increment the iterator driver_it
, causing an infinite loop.
You can modify the code like this:
void PipelineDriverPoller::backup_run_internal() {
DriverList tmp_drivers;
while (!_is_shutdown.load(std::memory_order_acquire)) {
_backup_count++;
{
std::unique_lock<std::mutex> lock(_backup_mutex);
tmp_drivers.assign(_backup_drivers.begin(), _backup_drivers.end());
}
auto driver_it = tmp_drivers.begin();
while (driver_it != tmp_drivers.end()) {
auto* driver = *driver_it;
if (!driver->is_query_never_expired() && driver->query_ctx()->is_query_expired()) {
upgrade_to_blocked_driver(driver);
} else if (driver->fragment_ctx()->is_canceled() || driver->need_report_exec_state()) {
upgrade_to_blocked_driver(driver);
} else if ((driver->pending_finish() && driver->is_still_pending_finish()) ||
(driver->is_epoch_finishing() && driver->is_still_epoch_finishing()) ||
driver->is_epoch_finished() || driver->is_finished()) {
upgrade_to_blocked_driver(driver);
} else {
auto status_or_is_not_blocked = driver->is_not_blocked();
if (status_or_is_not_blocked.ok() && status_or_is_not_blocked.value()) {
upgrade_to_blocked_driver(driver);
}
}
// Increment the iterator to avoid infinite loop
++driver_it;
}
tmp_drivers.clear();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
std::vector<std::shared_ptr<PipelineObserver>> _observers; | ||
}; | ||
|
||
} // namespace starrocks::pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Potential for dangling reference if the observer function object has a shorter lifespan than expected, leading to undefined behavior.
You can modify the code like this:
namespace starrocks::pipeline {
class PipelineDriver;
using ObserverFunc = std::function<void()>;
class PipelineObserver : public std::enable_shared_from_this<PipelineObserver> {
public:
static std::shared_ptr<PipelineObserver> create(const ObserverFunc& fn) {
return std::make_shared<PipelineObserver>(fn);
}
explicit PipelineObserver(const ObserverFunc& fn) : _observer_fn(fn) {}
void update() {
_observer_fn();
}
private:
ObserverFunc _observer_fn;
};
using PipelineObserverPtr = std::shared_ptr<PipelineObserver>;
class PipelinePublisher {
public:
void attach(PipelineObserverPtr observer) {
_observers.emplace_back(observer);
}
void notify() {
for (const auto& ob : _observers) {
if (auto observer = ob.lock()) { // Use weak_ptr to prevent prolonging the lifespan of observers unintentionally
observer->update();
}
}
}
private:
std::vector<std::weak_ptr<PipelineObserver>> _observers; // Change to weak_ptr to avoid cyclic references
};
} // namespace starrocks::pipeline
Key modifications:
PipelineObserver
now inherits fromstd::enable_shared_from_this
, ensuring safe usage of shared pointers.- Changed
std::shared_ptr<PipelineObserver>
tostd::weak_ptr<PipelineObserver>
in_observers
to prevent reference cycles and manage observer lifetimes correctly. - Added checks when accessing weak pointers to ensure they are still valid.
} | ||
}); | ||
} | ||
|
||
if (driver->is_precondition_block()) { | ||
driver->set_driver_state(DriverState::PRECONDITION_BLOCK); | ||
driver->mark_precondition_not_ready(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
The lambda function capture in the observe_operator
method by reference ([&]
) can lead to dangling references.
You can modify the code like this:
void GlobalDriverExecutor::submit(DriverRawPtr driver) {
driver->start_timers();
if (config::enable_pipeline_event_poller) {
driver->observe_operator([this](DriverRawPtr dr) {
if (dr->pending_finish() && !dr->is_still_pending_finish()) {
_blocked_driver_poller->upgrade_to_blocked_driver(dr);
return;
}
auto status_or_is_not_blocked = dr->is_not_blocked();
if (!status_or_is_not_blocked.ok() || status_or_is_not_blocked.value()) {
_blocked_driver_poller->upgrade_to_blocked_driver(dr);
return;
}
});
}
if (driver->is_precondition_block()) {
driver->set_driver_state(DriverState::PRECONDITION_BLOCK);
driver->mark_precondition_not_ready();
}
}
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[FE Incremental Coverage Report]✅ pass : 1 / 1 (100.00%) file detail
|
[BE Incremental Coverage Report]❌ fail : 67 / 113 (59.29%) file detail
|
Signed-off-by: Seaven <[email protected]>
Why I'm doing:
What I'm doing:
Fixes #issue
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: