Skip to content

Commit

Permalink
[BugFix] Fix pipeline metrics after resource group (#52508)
Browse files Browse the repository at this point in the history
Signed-off-by: zihe.liu <[email protected]>
(cherry picked from commit 94e69a0)
  • Loading branch information
ZiheLiu authored and mergify[bot] committed Nov 5, 2024
1 parent a68f381 commit 70197e1
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 17 deletions.
15 changes: 8 additions & 7 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,7 @@ GlobalDriverExecutor::GlobalDriverExecutor(const std::string& name, std::unique_
_thread_pool(std::move(thread_pool)),
_blocked_driver_poller(new PipelineDriverPoller(name, _driver_queue.get(), cpuids)),
_exec_state_reporter(new ExecStateReporter(cpuids)),
_audit_statistics_reporter(new AuditStatisticsReporter()) {
REGISTER_GAUGE_STARROCKS_METRIC(pipe_driver_schedule_count, [this]() { return _schedule_count.load(); });
REGISTER_GAUGE_STARROCKS_METRIC(pipe_driver_execution_time, [this]() { return _driver_execution_ns.load(); });
REGISTER_GAUGE_STARROCKS_METRIC(pipe_driver_queue_len, [this]() { return _driver_queue->size(); });
REGISTER_GAUGE_STARROCKS_METRIC(pipe_poller_block_queue_len,
[this] { return _blocked_driver_poller->num_drivers(); });
}
_audit_statistics_reporter(new AuditStatisticsReporter()) {}

void GlobalDriverExecutor::close() {
_driver_queue->close();
Expand All @@ -59,6 +53,13 @@ void GlobalDriverExecutor::initialize(int num_threads) {
}
}

DriverExecutorMetrics GlobalDriverExecutor::metrics() const {
return {.schedule_count = _schedule_count.load(),
.driver_execution_ns = _driver_execution_ns.load(),
.driver_queue_len = static_cast<int64_t>(_driver_queue->size()),
.driver_poller_block_queue_len = static_cast<int64_t>(_blocked_driver_poller->num_drivers())};
}

void GlobalDriverExecutor::change_num_threads(int32_t num_threads) {
int32_t old_num_threads = 0;
if (!_num_threads_setter.adjust_expect_num(num_threads, &old_num_threads)) {
Expand Down
11 changes: 11 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ namespace starrocks::pipeline {
class DriverExecutor;
using DriverExecutorPtr = std::shared_ptr<DriverExecutor>;

struct DriverExecutorMetrics {
int64_t schedule_count;
int64_t driver_execution_ns;
int64_t driver_queue_len;
int64_t driver_poller_block_queue_len;
};

class DriverExecutor {
public:
DriverExecutor(std::string name) : _name(std::move(name)) {}
Expand Down Expand Up @@ -65,6 +72,8 @@ class DriverExecutor {

virtual void bind_cpus(const CpuUtil::CpuIds& cpuids, const std::vector<CpuUtil::CpuIds>& borrowed_cpuids) = 0;

virtual DriverExecutorMetrics metrics() const = 0;

protected:
std::string _name;
};
Expand Down Expand Up @@ -102,6 +111,8 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr

void _finalize_epoch(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state);

DriverExecutorMetrics metrics() const override;

private:
// The maximum duration that a driver could stay in local_driver_queue
static constexpr int64_t LOCAL_MAX_WAIT_TIME_SPENT_NS = 1'000'000L;
Expand Down
13 changes: 6 additions & 7 deletions be/src/exec/workgroup/scan_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@

namespace starrocks::workgroup {

ScanExecutor::ScanExecutor(std::unique_ptr<ThreadPool> thread_pool, std::unique_ptr<ScanTaskQueue> task_queue,
bool add_metrics)
: _task_queue(std::move(task_queue)), _thread_pool(std::move(thread_pool)) {
if (add_metrics) {
REGISTER_GAUGE_STARROCKS_METRIC(pipe_scan_executor_queuing, [this]() { return _task_queue->size(); });
}
}
ScanExecutor::ScanExecutor(std::unique_ptr<ThreadPool> thread_pool, std::unique_ptr<ScanTaskQueue> task_queue)
: _task_queue(std::move(task_queue)), _thread_pool(std::move(thread_pool)) {}

void ScanExecutor::close() {
_task_queue->close();
Expand Down Expand Up @@ -99,4 +94,8 @@ void ScanExecutor::bind_cpus(const CpuUtil::CpuIds& cpuids, const std::vector<Cp
_thread_pool->bind_cpus(cpuids, borrowed_cpuids);
}

int64_t ScanExecutor::num_tasks() const {
return _task_queue->size();
}

} // namespace starrocks::workgroup
5 changes: 3 additions & 2 deletions be/src/exec/workgroup/scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ class ScanTaskQueue;

class ScanExecutor {
public:
explicit ScanExecutor(std::unique_ptr<ThreadPool> thread_pool, std::unique_ptr<ScanTaskQueue> task_queue,
bool add_metrics = true);
explicit ScanExecutor(std::unique_ptr<ThreadPool> thread_pool, std::unique_ptr<ScanTaskQueue> task_queue);
virtual ~ScanExecutor() = default;

void initialize(int32_t num_threads);
Expand All @@ -41,6 +40,8 @@ class ScanExecutor {

void bind_cpus(const CpuUtil::CpuIds& cpuids, const std::vector<CpuUtil::CpuIds>& borrowed_cpuids);

int64_t num_tasks() const;

private:
void worker_thread();

Expand Down
21 changes: 21 additions & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,27 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
_workgroup_manager = std::make_unique<workgroup::WorkGroupManager>(std::move(executors_manager_opts));
RETURN_IF_ERROR(_workgroup_manager->start());

StarRocksMetrics::instance()->metrics()->register_hook("pipe_execution_hook", [this] {
int64_t driver_schedule_count = 0;
int64_t driver_execution_ns = 0;
int64_t driver_queue_len = 0;
int64_t driver_poller_block_queue_len = 0;
int64_t scan_executor_queuing = 0;
_workgroup_manager->for_each_executors([&](const workgroup::PipelineExecutorSet& executors) {
const auto metrics = executors.driver_executor()->metrics();
driver_schedule_count += metrics.schedule_count;
driver_execution_ns += metrics.driver_execution_ns;
driver_queue_len += metrics.driver_queue_len;
driver_poller_block_queue_len += metrics.driver_poller_block_queue_len;
scan_executor_queuing += executors.scan_executor()->num_tasks();
});
StarRocksMetrics::instance()->pipe_driver_schedule_count.set_value(driver_schedule_count);
StarRocksMetrics::instance()->pipe_driver_execution_time.set_value(driver_execution_ns);
StarRocksMetrics::instance()->pipe_driver_queue_len.set_value(driver_queue_len);
StarRocksMetrics::instance()->pipe_poller_block_queue_len.set_value(driver_poller_block_queue_len);
StarRocksMetrics::instance()->pipe_scan_executor_queuing.set_value(scan_executor_queuing);
});

workgroup::DefaultWorkGroupInitialization default_workgroup_init;

if (store_paths.empty() && as_cn) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/util/starrocks_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ StarRocksMetrics::StarRocksMetrics() : _metrics(_s_registry_name) {
REGISTER_STARROCKS_METRIC(query_scan_bytes);
REGISTER_STARROCKS_METRIC(query_scan_rows);

REGISTER_STARROCKS_METRIC(pipe_scan_executor_queuing);
REGISTER_STARROCKS_METRIC(pipe_driver_schedule_count);
REGISTER_STARROCKS_METRIC(pipe_driver_execution_time);
REGISTER_STARROCKS_METRIC(pipe_driver_queue_len);
REGISTER_STARROCKS_METRIC(pipe_poller_block_queue_len);

REGISTER_STARROCKS_METRIC(load_channel_add_chunks_total);
REGISTER_STARROCKS_METRIC(load_channel_add_chunks_duration_us);
REGISTER_STARROCKS_METRIC(load_channel_add_chunks_wait_memtable_duration_us);
Expand Down
2 changes: 1 addition & 1 deletion be/test/exec/workgroup/scan_task_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ PARALLEL_TEST(ScanExecutorTest, test_yield) {
.set_max_threads(4)
.set_max_queue_size(100)
.build(&thread_pool));
auto executor = std::make_unique<ScanExecutor>(std::move(thread_pool), std::move(queue), false);
auto executor = std::make_unique<ScanExecutor>(std::move(thread_pool), std::move(queue));
DeferOp op([&]() { executor->close(); });
executor->initialize(4);

Expand Down

0 comments on commit 70197e1

Please sign in to comment.