Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Enhancement] Support poller event #49972

Closed
wants to merge 3 commits into from
Closed

Conversation

Seaven
Copy link
Contributor

@Seaven Seaven commented Aug 19, 2024

Why I'm doing:

What I'm doing:

Fixes #issue

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

@Seaven Seaven requested review from a team as code owners August 19, 2024 09:07
@Seaven Seaven marked this pull request as draft August 19, 2024 09:07
@mergify mergify bot assigned Seaven Aug 19, 2024
@@ -227,6 +320,7 @@ size_t PipelineDriverPoller::calculate_parked_driver(const ImmutableDriverPredic
}

void PipelineDriverPoller::remove_blocked_driver(DriverList& local_blocked_drivers, DriverList::iterator& driver_it) {
_ready_count++;
auto& driver = *driver_it;
driver->_pending_timer->update(driver->_pending_timer_sw->elapsed_time());
local_blocked_drivers.erase(driver_it++);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
The while loop in backup_run_internal does not increment the iterator driver_it, causing an infinite loop.

You can modify the code like this:

void PipelineDriverPoller::backup_run_internal() {
    DriverList tmp_drivers;
    while (!_is_shutdown.load(std::memory_order_acquire)) {
        _backup_count++;
        {
            std::unique_lock<std::mutex> lock(_backup_mutex);
            tmp_drivers.assign(_backup_drivers.begin(), _backup_drivers.end());
        }
        auto driver_it = tmp_drivers.begin();
        while (driver_it != tmp_drivers.end()) {
            auto* driver = *driver_it;

            if (!driver->is_query_never_expired() && driver->query_ctx()->is_query_expired()) {
                upgrade_to_blocked_driver(driver);
            } else if (driver->fragment_ctx()->is_canceled() || driver->need_report_exec_state()) {
                upgrade_to_blocked_driver(driver);
            } else if ((driver->pending_finish() && driver->is_still_pending_finish()) ||
                       (driver->is_epoch_finishing() && driver->is_still_epoch_finishing()) ||
                       driver->is_epoch_finished() || driver->is_finished()) {
                upgrade_to_blocked_driver(driver);
            } else {
                auto status_or_is_not_blocked = driver->is_not_blocked();
                if (status_or_is_not_blocked.ok() && status_or_is_not_blocked.value()) {
                    upgrade_to_blocked_driver(driver);
                }
            }
            // Increment the iterator to avoid infinite loop
            ++driver_it;
        }

        tmp_drivers.clear();
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

std::vector<std::shared_ptr<PipelineObserver>> _observers;
};

} // namespace starrocks::pipeline
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Potential for dangling reference if the observer function object has a shorter lifespan than expected, leading to undefined behavior.

You can modify the code like this:

namespace starrocks::pipeline {
class PipelineDriver;
using ObserverFunc = std::function<void()>;

class PipelineObserver : public std::enable_shared_from_this<PipelineObserver> {
public:
    static std::shared_ptr<PipelineObserver> create(const ObserverFunc& fn) {
        return std::make_shared<PipelineObserver>(fn);
    }

    explicit PipelineObserver(const ObserverFunc& fn) : _observer_fn(fn) {}

    void update() {
        _observer_fn();
    }

private:
    ObserverFunc _observer_fn;
};

using PipelineObserverPtr = std::shared_ptr<PipelineObserver>;

class PipelinePublisher {
public:
    void attach(PipelineObserverPtr observer) { 
        _observers.emplace_back(observer); 
    }

    void notify() {
        for (const auto& ob : _observers) {
            if (auto observer = ob.lock()) { // Use weak_ptr to prevent prolonging the lifespan of observers unintentionally
                observer->update();
            }
        }
    }

private:
    std::vector<std::weak_ptr<PipelineObserver>> _observers; // Change to weak_ptr to avoid cyclic references
};

} // namespace starrocks::pipeline

Key modifications:

  1. PipelineObserver now inherits from std::enable_shared_from_this, ensuring safe usage of shared pointers.
  2. Changed std::shared_ptr<PipelineObserver> to std::weak_ptr<PipelineObserver> in _observers to prevent reference cycles and manage observer lifetimes correctly.
  3. Added checks when accessing weak pointers to ensure they are still valid.

}
});
}

if (driver->is_precondition_block()) {
driver->set_driver_state(DriverState::PRECONDITION_BLOCK);
driver->mark_precondition_not_ready();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
The lambda function capture in the observe_operator method by reference ([&]) can lead to dangling references.
You can modify the code like this:

void GlobalDriverExecutor::submit(DriverRawPtr driver) {
    driver->start_timers();

    if (config::enable_pipeline_event_poller) {
        driver->observe_operator([this](DriverRawPtr dr) {
            if (dr->pending_finish() && !dr->is_still_pending_finish()) {
                _blocked_driver_poller->upgrade_to_blocked_driver(dr);
                return;
            }
            auto status_or_is_not_blocked = dr->is_not_blocked();
            if (!status_or_is_not_blocked.ok() || status_or_is_not_blocked.value()) {
                _blocked_driver_poller->upgrade_to_blocked_driver(dr);
                return;
            }
        });
    }

    if (driver->is_precondition_block()) {
        driver->set_driver_state(DriverState::PRECONDITION_BLOCK);
        driver->mark_precondition_not_ready();
    }
}

Signed-off-by: Seaven <[email protected]>
Copy link

[Java-Extensions Incremental Coverage Report]

pass : 0 / 0 (0%)

Copy link

[FE Incremental Coverage Report]

pass : 1 / 1 (100.00%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 com/starrocks/server/GlobalStateMgr.java 1 1 100.00% []

Copy link

[BE Incremental Coverage Report]

fail : 67 / 113 (59.29%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 src/exec/pipeline/aggregate/aggregate_blocking_sink_operator.cpp 0 1 00.00% [38]
🔵 src/exec/pipeline/scan/olap_scan_context.h 0 1 00.00% [131]
🔵 src/exec/pipeline/aggregate/aggregate_blocking_source_operator.cpp 0 5 00.00% [27, 28, 31, 41, 43]
🔵 src/exec/pipeline/aggregate/aggregate_streaming_source_operator.cpp 0 2 00.00% [28, 65]
🔵 src/exec/pipeline/scan/olap_scan_prepare_operator.cpp 0 1 00.00% [88]
🔵 src/exec/aggregator.h 0 3 00.00% [286, 287, 288]
🔵 src/exec/pipeline/exchange/sink_buffer.cpp 0 10 00.00% [108, 110, 111, 283, 286, 382, 383, 384, 402, 403]
🔵 src/runtime/data_stream_recvr.cpp 0 5 00.00% [243, 246, 265, 272, 316]
🔵 src/exec/pipeline/scan/olap_scan_operator.cpp 0 2 00.00% [98, 137]
🔵 src/exec/pipeline/exchange/local_exchange_source_operator.cpp 1 6 16.67% [75, 149, 150, 190, 191]
🔵 src/exec/aggregator.cpp 3 5 60.00% [601, 892]
🔵 src/exec/pipeline/scan/scan_operator.cpp 7 9 77.78% [245, 508]
🔵 src/exec/pipeline/pipeline_driver_executor.cpp 8 10 80.00% [268, 269]
🔵 src/exec/pipeline/pipeline_driver_poller.cpp 43 48 89.58% [217, 245, 246, 247, 248]
🔵 src/exec/pipeline/context_with_dependency.h 1 1 100.00% []
🔵 src/exec/pipeline/pipeline_driver.cpp 4 4 100.00% []

Signed-off-by: Seaven <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant