From 8d6abcf265185afd93049f4eb94b6d78f2ec20bb Mon Sep 17 00:00:00 2001 From: stdpain Date: Tue, 24 Dec 2024 20:15:36 +0800 Subject: [PATCH] add more case Signed-off-by: stdpain --- be/src/exec/pipeline/empty_set_operator.h | 2 + be/src/exec/pipeline/noop_sink_operator.h | 2 + .../exec/pipeline/schedule/observer_test.cpp | 60 ++++++------------- 3 files changed, 21 insertions(+), 43 deletions(-) diff --git a/be/src/exec/pipeline/empty_set_operator.h b/be/src/exec/pipeline/empty_set_operator.h index bef6fdadcf01e..c002e5e99ddca 100644 --- a/be/src/exec/pipeline/empty_set_operator.h +++ b/be/src/exec/pipeline/empty_set_operator.h @@ -41,6 +41,8 @@ class EmptySetOperatorFactory final : public SourceOperatorFactory { ~EmptySetOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { return std::make_shared(this, _id, _plan_node_id, driver_sequence); } diff --git a/be/src/exec/pipeline/noop_sink_operator.h b/be/src/exec/pipeline/noop_sink_operator.h index 57d1cbb78a64a..1f11741204f00 100644 --- a/be/src/exec/pipeline/noop_sink_operator.h +++ b/be/src/exec/pipeline/noop_sink_operator.h @@ -55,6 +55,8 @@ class NoopSinkOperatorFactory final : public OperatorFactory { ~NoopSinkOperatorFactory() override = default; + bool support_event_scheduler() const override { return true; } + OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override { return std::make_shared(this, _id, _plan_node_id, driver_sequence); } diff --git a/be/test/exec/pipeline/schedule/observer_test.cpp b/be/test/exec/pipeline/schedule/observer_test.cpp index d7fd9759c2896..366da1ac6567c 100644 --- a/be/test/exec/pipeline/schedule/observer_test.cpp +++ b/be/test/exec/pipeline/schedule/observer_test.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "exec/pipeline/schedule/observer.h" + #include #include @@ -35,6 +37,9 @@ #include "testutil/assert.h" #include "util/runtime_profile.h" +#pragma GCC push_options +#pragma GCC optimize("no-inline") + namespace starrocks::pipeline { TEST(RequestCntlTest, test) { { @@ -191,33 +196,10 @@ TEST_F(PipelineObserverTest, basic_test) { factories.emplace_back(std::make_shared(0, 1)); factories.emplace_back(std::make_shared(2, 3)); - Pipeline pipeline(0, factories, _exec_group.get()); - auto operators = pipeline.create_operators(1, 0); - PipelineDriver driver(operators, _dummy_query_ctx.get(), _dummy_fragment_ctx.get(), &pipeline, 1); - driver.assign_observer(); - ASSERT_OK(driver.prepare(_dummy_fragment_ctx->runtime_state())); - - auto driver_queue = std::make_unique(); - _dummy_fragment_ctx->init_event_scheduler(); - _dummy_fragment_ctx->event_scheduler()->attach_queue(driver_queue.get()); - - driver.set_in_block_queue(true); - driver.set_driver_state(DriverState::INPUT_EMPTY); - // test notify - driver.observer()->all_update(); - ASSERT_OK(driver_queue->take(false)); - driver.observer()->cancel_update(); - ASSERT_OK(driver_queue->take(false)); - driver.observer()->sink_update(); - ASSERT_OK(driver_queue->take(false)); - driver.observer()->source_update(); - driver.observer()->source_update(); -} + for (auto& factory : factories) { + ASSERT_TRUE(factory->support_event_scheduler()); + } -TEST_F(PipelineObserverTest, basic_test2) { - OpFactories factories; - factories.emplace_back(std::make_shared(0, 1)); - factories.emplace_back(std::make_shared(2, 3)); SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get()); ASSERT_OK(tx.driver->prepare(_runtime_state.get())); const auto& driver = tx.driver; @@ -236,31 +218,23 @@ TEST_F(PipelineObserverTest, basic_test2) { driver->observer()->source_update(); } -TEST_F(PipelineObserverTest, race_scheduler_with_observer) { +TEST_F(PipelineObserverTest, test_obs) { OpFactories factories; factories.emplace_back(std::make_shared(0, 1)); factories.emplace_back(std::make_shared(2, 3)); SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get()); ASSERT_OK(tx.driver->prepare(_runtime_state.get())); const auto& driver = tx.driver; - const auto& driver_queue = tx.driver_queue; - - driver->set_in_block_queue(false); - driver->set_driver_state(DriverState::INPUT_EMPTY); - driver->observer()->sink_update(); - _dummy_fragment_ctx->event_scheduler()->add_blocked_driver(driver.get()); driver->set_in_block_queue(true); driver->set_driver_state(DriverState::INPUT_EMPTY); - // test notify - driver->observer()->all_update(); - ASSERT_OK(driver_queue->take(false)); - driver->observer()->cancel_update(); - ASSERT_OK(driver_queue->take(false)); - driver->observer()->sink_update(); - ASSERT_OK(driver_queue->take(false)); - driver->observer()->source_update(); - driver->observer()->source_update(); + Observable obs; + _runtime_state->set_enable_event_scheduler(true); + obs.add_observer(_runtime_state.get(), driver->observer()); + ASSERT_GT(obs.to_string().size(), 0); + obs.notify_sink_observers(); + obs.notify_source_observers(); } -} // namespace starrocks::pipeline \ No newline at end of file +} // namespace starrocks::pipeline +#pragma GCC pop_options