From 70197e1e2b2fffd55b1032200aafbc4337a01042 Mon Sep 17 00:00:00 2001 From: "zihe.liu" Date: Tue, 5 Nov 2024 14:55:50 +0800 Subject: [PATCH] [BugFix] Fix pipeline metrics after resource group (#52508) Signed-off-by: zihe.liu (cherry picked from commit 94e69a06b41839994846fc6e4cbc1a1f9abe9521) --- .../pipeline/pipeline_driver_executor.cpp | 15 ++++++------- .../exec/pipeline/pipeline_driver_executor.h | 11 ++++++++++ be/src/exec/workgroup/scan_executor.cpp | 13 ++++++------ be/src/exec/workgroup/scan_executor.h | 5 +++-- be/src/runtime/exec_env.cpp | 21 +++++++++++++++++++ be/src/util/starrocks_metrics.cpp | 6 ++++++ .../exec/workgroup/scan_task_queue_test.cpp | 2 +- 7 files changed, 56 insertions(+), 17 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_driver_executor.cpp b/be/src/exec/pipeline/pipeline_driver_executor.cpp index fafa67329b6bb..00c95229ec699 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.cpp +++ b/be/src/exec/pipeline/pipeline_driver_executor.cpp @@ -37,13 +37,7 @@ GlobalDriverExecutor::GlobalDriverExecutor(const std::string& name, std::unique_ _thread_pool(std::move(thread_pool)), _blocked_driver_poller(new PipelineDriverPoller(name, _driver_queue.get(), cpuids)), _exec_state_reporter(new ExecStateReporter(cpuids)), - _audit_statistics_reporter(new AuditStatisticsReporter()) { - REGISTER_GAUGE_STARROCKS_METRIC(pipe_driver_schedule_count, [this]() { return _schedule_count.load(); }); - REGISTER_GAUGE_STARROCKS_METRIC(pipe_driver_execution_time, [this]() { return _driver_execution_ns.load(); }); - REGISTER_GAUGE_STARROCKS_METRIC(pipe_driver_queue_len, [this]() { return _driver_queue->size(); }); - REGISTER_GAUGE_STARROCKS_METRIC(pipe_poller_block_queue_len, - [this] { return _blocked_driver_poller->num_drivers(); }); -} + _audit_statistics_reporter(new AuditStatisticsReporter()) {} void GlobalDriverExecutor::close() { _driver_queue->close(); @@ -59,6 +53,13 @@ void GlobalDriverExecutor::initialize(int num_threads) { } } +DriverExecutorMetrics GlobalDriverExecutor::metrics() const { + return {.schedule_count = _schedule_count.load(), + .driver_execution_ns = _driver_execution_ns.load(), + .driver_queue_len = static_cast(_driver_queue->size()), + .driver_poller_block_queue_len = static_cast(_blocked_driver_poller->num_drivers())}; +} + void GlobalDriverExecutor::change_num_threads(int32_t num_threads) { int32_t old_num_threads = 0; if (!_num_threads_setter.adjust_expect_num(num_threads, &old_num_threads)) { diff --git a/be/src/exec/pipeline/pipeline_driver_executor.h b/be/src/exec/pipeline/pipeline_driver_executor.h index 3a0be6a6116a0..e17e50eeed20f 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.h +++ b/be/src/exec/pipeline/pipeline_driver_executor.h @@ -34,6 +34,13 @@ namespace starrocks::pipeline { class DriverExecutor; using DriverExecutorPtr = std::shared_ptr; +struct DriverExecutorMetrics { + int64_t schedule_count; + int64_t driver_execution_ns; + int64_t driver_queue_len; + int64_t driver_poller_block_queue_len; +}; + class DriverExecutor { public: DriverExecutor(std::string name) : _name(std::move(name)) {} @@ -65,6 +72,8 @@ class DriverExecutor { virtual void bind_cpus(const CpuUtil::CpuIds& cpuids, const std::vector& borrowed_cpuids) = 0; + virtual DriverExecutorMetrics metrics() const = 0; + protected: std::string _name; }; @@ -102,6 +111,8 @@ class GlobalDriverExecutor final : public FactoryMethod thread_pool, std::unique_ptr task_queue, - bool add_metrics) - : _task_queue(std::move(task_queue)), _thread_pool(std::move(thread_pool)) { - if (add_metrics) { - REGISTER_GAUGE_STARROCKS_METRIC(pipe_scan_executor_queuing, [this]() { return _task_queue->size(); }); - } -} +ScanExecutor::ScanExecutor(std::unique_ptr thread_pool, std::unique_ptr task_queue) + : _task_queue(std::move(task_queue)), _thread_pool(std::move(thread_pool)) {} void ScanExecutor::close() { _task_queue->close(); @@ -99,4 +94,8 @@ void ScanExecutor::bind_cpus(const CpuUtil::CpuIds& cpuids, const std::vectorbind_cpus(cpuids, borrowed_cpuids); } +int64_t ScanExecutor::num_tasks() const { + return _task_queue->size(); +} + } // namespace starrocks::workgroup diff --git a/be/src/exec/workgroup/scan_executor.h b/be/src/exec/workgroup/scan_executor.h index ab01952ef5435..8cc2a8b6dcc2f 100644 --- a/be/src/exec/workgroup/scan_executor.h +++ b/be/src/exec/workgroup/scan_executor.h @@ -27,8 +27,7 @@ class ScanTaskQueue; class ScanExecutor { public: - explicit ScanExecutor(std::unique_ptr thread_pool, std::unique_ptr task_queue, - bool add_metrics = true); + explicit ScanExecutor(std::unique_ptr thread_pool, std::unique_ptr task_queue); virtual ~ScanExecutor() = default; void initialize(int32_t num_threads); @@ -41,6 +40,8 @@ class ScanExecutor { void bind_cpus(const CpuUtil::CpuIds& cpuids, const std::vector& borrowed_cpuids); + int64_t num_tasks() const; + private: void worker_thread(); diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 72a02a8013e73..b75874310dd33 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -446,6 +446,27 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { _workgroup_manager = std::make_unique(std::move(executors_manager_opts)); RETURN_IF_ERROR(_workgroup_manager->start()); + StarRocksMetrics::instance()->metrics()->register_hook("pipe_execution_hook", [this] { + int64_t driver_schedule_count = 0; + int64_t driver_execution_ns = 0; + int64_t driver_queue_len = 0; + int64_t driver_poller_block_queue_len = 0; + int64_t scan_executor_queuing = 0; + _workgroup_manager->for_each_executors([&](const workgroup::PipelineExecutorSet& executors) { + const auto metrics = executors.driver_executor()->metrics(); + driver_schedule_count += metrics.schedule_count; + driver_execution_ns += metrics.driver_execution_ns; + driver_queue_len += metrics.driver_queue_len; + driver_poller_block_queue_len += metrics.driver_poller_block_queue_len; + scan_executor_queuing += executors.scan_executor()->num_tasks(); + }); + StarRocksMetrics::instance()->pipe_driver_schedule_count.set_value(driver_schedule_count); + StarRocksMetrics::instance()->pipe_driver_execution_time.set_value(driver_execution_ns); + StarRocksMetrics::instance()->pipe_driver_queue_len.set_value(driver_queue_len); + StarRocksMetrics::instance()->pipe_poller_block_queue_len.set_value(driver_poller_block_queue_len); + StarRocksMetrics::instance()->pipe_scan_executor_queuing.set_value(scan_executor_queuing); + }); + workgroup::DefaultWorkGroupInitialization default_workgroup_init; if (store_paths.empty() && as_cn) { diff --git a/be/src/util/starrocks_metrics.cpp b/be/src/util/starrocks_metrics.cpp index 41afb5c81ee5c..92a42f8a4ea74 100644 --- a/be/src/util/starrocks_metrics.cpp +++ b/be/src/util/starrocks_metrics.cpp @@ -54,6 +54,12 @@ StarRocksMetrics::StarRocksMetrics() : _metrics(_s_registry_name) { REGISTER_STARROCKS_METRIC(query_scan_bytes); REGISTER_STARROCKS_METRIC(query_scan_rows); + REGISTER_STARROCKS_METRIC(pipe_scan_executor_queuing); + REGISTER_STARROCKS_METRIC(pipe_driver_schedule_count); + REGISTER_STARROCKS_METRIC(pipe_driver_execution_time); + REGISTER_STARROCKS_METRIC(pipe_driver_queue_len); + REGISTER_STARROCKS_METRIC(pipe_poller_block_queue_len); + REGISTER_STARROCKS_METRIC(load_channel_add_chunks_total); REGISTER_STARROCKS_METRIC(load_channel_add_chunks_duration_us); REGISTER_STARROCKS_METRIC(load_channel_add_chunks_wait_memtable_duration_us); diff --git a/be/test/exec/workgroup/scan_task_queue_test.cpp b/be/test/exec/workgroup/scan_task_queue_test.cpp index 2c46fa11ee9e5..223df792d105b 100644 --- a/be/test/exec/workgroup/scan_task_queue_test.cpp +++ b/be/test/exec/workgroup/scan_task_queue_test.cpp @@ -39,7 +39,7 @@ PARALLEL_TEST(ScanExecutorTest, test_yield) { .set_max_threads(4) .set_max_queue_size(100) .build(&thread_pool)); - auto executor = std::make_unique(std::move(thread_pool), std::move(queue), false); + auto executor = std::make_unique(std::move(thread_pool), std::move(queue)); DeferOp op([&]() { executor->close(); }); executor->initialize(4);