Skip to content

Commit

Permalink
[Enhancement] add passthrough buffer mem tracker (#52404)
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
(cherry picked from commit 008ec47)

Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Nov 11, 2024
1 parent 5c9fa21 commit 94eb267
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 3 deletions.
5 changes: 3 additions & 2 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,15 @@ void calculate_metrics(void* arg_this) {
LOG(INFO) << fmt::format(
"Current memory statistics: process({}), query_pool({}), load({}), "
"metadata({}), compaction({}), schema_change({}), column_pool({}), "
"page_cache({}), update({}), chunk_allocator({}), clone({}), consistency({}), "
"page_cache({}), update({}), chunk_allocator({}), passthrough({}), clone({}), consistency({}), "
"datacache({}), jit({})",
mem_metrics->process_mem_bytes.value(), mem_metrics->query_mem_bytes.value(),
mem_metrics->load_mem_bytes.value(), mem_metrics->metadata_mem_bytes.value(),
mem_metrics->compaction_mem_bytes.value(), mem_metrics->schema_change_mem_bytes.value(),
mem_metrics->column_pool_mem_bytes.value(), mem_metrics->storage_page_cache_mem_bytes.value(),
mem_metrics->update_mem_bytes.value(), mem_metrics->chunk_allocator_mem_bytes.value(),
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value(), datacache_mem_bytes,
mem_metrics->passthrough_mem_bytes.value(), mem_metrics->clone_mem_bytes.value(),
mem_metrics->consistency_mem_bytes.value(), datacache_mem_bytes,
mem_metrics->jit_cache_mem_bytes.value());

nap_sleep(15, [daemon] { return daemon->stopped(); });
Expand Down
3 changes: 3 additions & 0 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ void mem_tracker_handler(MemTracker* mem_tracker, const WebPageHandler::Argument
} else if (iter->second == "chunk_allocator") {
start_mem_tracker = GlobalEnv::GetInstance()->chunk_allocator_mem_tracker();
cur_level = 2;
} else if (iter->second == "passthrough") {
start_mem_tracker = GlobalEnv::GetInstance()->passthrough_mem_tracker();
cur_level = 2;
} else if (iter->second == "consistency") {
start_mem_tracker = GlobalEnv::GetInstance()->consistency_mem_tracker();
cur_level = 2;
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ Status GlobalEnv::_init_mem_tracker() {
int32_t update_mem_percent = std::max(std::min(100, config::update_memory_limit_percent), 0);
_update_mem_tracker = regist_tracker(bytes_limit * update_mem_percent / 100, "update", nullptr);
_chunk_allocator_mem_tracker = regist_tracker(-1, "chunk_allocator", _process_mem_tracker.get());
_passthrough_mem_tracker = regist_tracker(MemTracker::PASSTHROUGH, -1, "passthrough");
_clone_mem_tracker = regist_tracker(-1, "clone", _process_mem_tracker.get());
int64_t consistency_mem_limit = calc_max_consistency_memory(_process_mem_tracker->limit());
_consistency_mem_tracker = regist_tracker(consistency_mem_limit, "consistency", _process_mem_tracker.get());
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class GlobalEnv {
MemTracker* jit_cache_mem_tracker() { return _jit_cache_mem_tracker.get(); }
MemTracker* update_mem_tracker() { return _update_mem_tracker.get(); }
MemTracker* chunk_allocator_mem_tracker() { return _chunk_allocator_mem_tracker.get(); }
MemTracker* passthrough_mem_tracker() { return _passthrough_mem_tracker.get(); }
MemTracker* clone_mem_tracker() { return _clone_mem_tracker.get(); }
MemTracker* consistency_mem_tracker() { return _consistency_mem_tracker.get(); }
MemTracker* replication_mem_tracker() { return _replication_mem_tracker.get(); }
Expand Down Expand Up @@ -218,6 +219,8 @@ class GlobalEnv {
std::shared_ptr<MemTracker> _update_mem_tracker;

std::shared_ptr<MemTracker> _chunk_allocator_mem_tracker;
// record mem usage in passthrough
std::shared_ptr<MemTracker> _passthrough_mem_tracker;

std::shared_ptr<MemTracker> _clone_mem_tracker;

Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/local_pass_through_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "column/chunk.h"
#include "common/logging.h"
#include "runtime/current_thread.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"

namespace starrocks {
Expand All @@ -29,6 +30,7 @@ class PassThroughSenderChannel {
~PassThroughSenderChannel() {
if (_physical_bytes > 0) {
CurrentThread::current().mem_consume(_physical_bytes);
GlobalEnv::GetInstance()->passthrough_mem_tracker()->release(_physical_bytes);
}
}

Expand All @@ -39,6 +41,7 @@ class PassThroughSenderChannel {
int64_t physical_bytes = CurrentThread::current().get_consumed_bytes() - before_bytes;
DCHECK_GE(physical_bytes, 0);
CurrentThread::current().mem_release(physical_bytes);
GlobalEnv::GetInstance()->passthrough_mem_tracker()->consume(physical_bytes);

std::unique_lock lock(_mutex);
_buffer.emplace_back(std::make_pair(std::move(clone), driver_sequence));
Expand All @@ -50,7 +53,7 @@ class PassThroughSenderChannel {
std::unique_lock lock(_mutex);
chunks->swap(_buffer);
bytes->swap(_bytes);

GlobalEnv::GetInstance()->passthrough_mem_tracker()->release(_physical_bytes);
// Consume physical bytes in current MemTracker, since later it would be released
CurrentThread::current().mem_consume(_physical_bytes);
_total_bytes -= _physical_bytes;
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class MemTracker {
RESOURCE_GROUP,
RESOURCE_GROUP_BIG_QUERY,
JEMALLOC,
PASSTHROUGH,
};

/// 'byte_limit' < 0 means no limit
Expand Down
1 change: 1 addition & 0 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ void bind_exec_env(ForeignModule& m) {
REG_METHOD(GlobalEnv, jit_cache_mem_tracker);
REG_METHOD(GlobalEnv, update_mem_tracker);
REG_METHOD(GlobalEnv, chunk_allocator_mem_tracker);
REG_METHOD(GlobalEnv, passthrough_mem_tracker);
REG_METHOD(GlobalEnv, clone_mem_tracker);
REG_METHOD(GlobalEnv, consistency_mem_tracker);
REG_METHOD(GlobalEnv, connector_scan_pool_mem_tracker);
Expand Down
1 change: 1 addition & 0 deletions be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ void SystemMetrics::_update_memory_metrics() {
SET_MEM_METRIC_VALUE(jit_cache_mem_tracker, jit_cache_mem_bytes)
SET_MEM_METRIC_VALUE(update_mem_tracker, update_mem_bytes)
SET_MEM_METRIC_VALUE(chunk_allocator_mem_tracker, chunk_allocator_mem_bytes)
SET_MEM_METRIC_VALUE(passthrough_mem_tracker, passthrough_mem_bytes)
SET_MEM_METRIC_VALUE(clone_mem_tracker, clone_mem_bytes)
SET_MEM_METRIC_VALUE(column_pool_mem_tracker, column_pool_mem_bytes)
SET_MEM_METRIC_VALUE(consistency_mem_tracker, consistency_mem_bytes)
Expand Down
1 change: 1 addition & 0 deletions be/src/util/system_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class MemoryMetrics {
METRIC_DEFINE_INT_GAUGE(jit_cache_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(update_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(chunk_allocator_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(passthrough_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(clone_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(consistency_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(datacache_mem_bytes, MetricUnit::BYTES);
Expand Down

0 comments on commit 94eb267

Please sign in to comment.