Skip to content

Commit

Permalink
add more case
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Dec 24, 2024
1 parent 7791a7d commit 8d6abcf
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 43 deletions.
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/empty_set_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class EmptySetOperatorFactory final : public SourceOperatorFactory {

~EmptySetOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<EmptySetOperator>(this, _id, _plan_node_id, driver_sequence);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/noop_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class NoopSinkOperatorFactory final : public OperatorFactory {

~NoopSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<NoopSinkOperator>(this, _id, _plan_node_id, driver_sequence);
}
Expand Down
60 changes: 17 additions & 43 deletions be/test/exec/pipeline/schedule/observer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/schedule/observer.h"

#include <bits/types/struct_timespec.h>
#include <unistd.h>

Expand All @@ -35,6 +37,9 @@
#include "testutil/assert.h"
#include "util/runtime_profile.h"

#pragma GCC push_options
#pragma GCC optimize("no-inline")

namespace starrocks::pipeline {
TEST(RequestCntlTest, test) {
{
Expand Down Expand Up @@ -191,33 +196,10 @@ TEST_F(PipelineObserverTest, basic_test) {
factories.emplace_back(std::make_shared<EmptySetOperatorFactory>(0, 1));
factories.emplace_back(std::make_shared<NoopSinkOperatorFactory>(2, 3));

Pipeline pipeline(0, factories, _exec_group.get());
auto operators = pipeline.create_operators(1, 0);
PipelineDriver driver(operators, _dummy_query_ctx.get(), _dummy_fragment_ctx.get(), &pipeline, 1);
driver.assign_observer();
ASSERT_OK(driver.prepare(_dummy_fragment_ctx->runtime_state()));

auto driver_queue = std::make_unique<QuerySharedDriverQueue>();
_dummy_fragment_ctx->init_event_scheduler();
_dummy_fragment_ctx->event_scheduler()->attach_queue(driver_queue.get());

driver.set_in_block_queue(true);
driver.set_driver_state(DriverState::INPUT_EMPTY);
// test notify
driver.observer()->all_update();
ASSERT_OK(driver_queue->take(false));
driver.observer()->cancel_update();
ASSERT_OK(driver_queue->take(false));
driver.observer()->sink_update();
ASSERT_OK(driver_queue->take(false));
driver.observer()->source_update();
driver.observer()->source_update();
}
for (auto& factory : factories) {
ASSERT_TRUE(factory->support_event_scheduler());
}

TEST_F(PipelineObserverTest, basic_test2) {
OpFactories factories;
factories.emplace_back(std::make_shared<EmptySetOperatorFactory>(0, 1));
factories.emplace_back(std::make_shared<NoopSinkOperatorFactory>(2, 3));
SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get());
ASSERT_OK(tx.driver->prepare(_runtime_state.get()));
const auto& driver = tx.driver;
Expand All @@ -236,31 +218,23 @@ TEST_F(PipelineObserverTest, basic_test2) {
driver->observer()->source_update();
}

TEST_F(PipelineObserverTest, race_scheduler_with_observer) {
TEST_F(PipelineObserverTest, test_obs) {
OpFactories factories;
factories.emplace_back(std::make_shared<EmptySetOperatorFactory>(0, 1));
factories.emplace_back(std::make_shared<NoopSinkOperatorFactory>(2, 3));
SimpleTestContext tx(factories, _exec_group.get(), _dummy_fragment_ctx.get(), _dummy_query_ctx.get());
ASSERT_OK(tx.driver->prepare(_runtime_state.get()));
const auto& driver = tx.driver;
const auto& driver_queue = tx.driver_queue;

driver->set_in_block_queue(false);
driver->set_driver_state(DriverState::INPUT_EMPTY);
driver->observer()->sink_update();
_dummy_fragment_ctx->event_scheduler()->add_blocked_driver(driver.get());

driver->set_in_block_queue(true);
driver->set_driver_state(DriverState::INPUT_EMPTY);
// test notify
driver->observer()->all_update();
ASSERT_OK(driver_queue->take(false));
driver->observer()->cancel_update();
ASSERT_OK(driver_queue->take(false));
driver->observer()->sink_update();
ASSERT_OK(driver_queue->take(false));
driver->observer()->source_update();
driver->observer()->source_update();
Observable obs;
_runtime_state->set_enable_event_scheduler(true);
obs.add_observer(_runtime_state.get(), driver->observer());
ASSERT_GT(obs.to_string().size(), 0);
obs.notify_sink_observers();
obs.notify_source_observers();
}

} // namespace starrocks::pipeline
} // namespace starrocks::pipeline
#pragma GCC pop_options

0 comments on commit 8d6abcf

Please sign in to comment.