Skip to content

Commit

Permalink
[Enhancement] Improve some load channel and delta writer metrics
Browse files Browse the repository at this point in the history
Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb committed Jan 3, 2025
1 parent 04ddf66 commit 341d385
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 9 deletions.
8 changes: 7 additions & 1 deletion be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,16 @@ void LoadChannel::add_chunks(const PTabletWriterAddChunksRequest& req, PTabletWr
watch.start();
faststring uncompressed_buffer;
std::unique_ptr<Chunk> chunk;
bool eos = false;
for (int i = 0; i < req.requests_size(); i++) {
auto& request = req.requests(i);
VLOG_RPC << "tablet writer add chunk, id=" << print_id(request.id()) << ", index_id=" << request.index_id()
<< ", sender_id=" << request.sender_id() << " request_index=" << i << " eos=" << request.eos();

if (request.eos()) {
eos = true;
}

if (i == 0 && request.has_chunk()) {
chunk = std::make_unique<Chunk>();
auto& pchunk = request.chunk();
Expand All @@ -222,10 +227,11 @@ void LoadChannel::add_chunks(const PTabletWriterAddChunksRequest& req, PTabletWr
<< ", index_id=" << request.index_id() << ", sender_id=" << request.sender_id()
<< " request_index=" << i << " eos=" << request.eos()
<< " err=" << response->status().error_msgs(0);
return;
break;
}
}
StarRocksMetrics::instance()->load_channel_add_chunks_total.increment(1);
StarRocksMetrics::instance()->load_channel_add_chunks_eos_total.increment(eos ? 1 : 0);
StarRocksMetrics::instance()->load_channel_add_chunks_duration_us.increment(watch.elapsed_time() / 1000);
report_profile(response, config::pipeline_print_profile);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ void LocalTabletsChannel::_update_peer_replica_profile(DeltaWriter* writer, Runt
ADD_AND_UPDATE_TIMER(profile, "CommitTime", writer_stat.commit_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitWaitFlushTime", writer_stat.commit_wait_flush_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitRowsetBuildTime", writer_stat.commit_rowset_build_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitFinishPkTime", writer_stat.commit_finish_pk_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitPkPreloadTime", writer_stat.commit_pk_preload_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitWaitReplicaTime", writer_stat.commit_wait_replica_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitTxnCommitTime", writer_stat.commit_txn_commit_time_ns);

Expand Down Expand Up @@ -1114,7 +1114,7 @@ void LocalTabletsChannel::_update_secondary_replica_profile(DeltaWriter* writer,
ADD_AND_UPDATE_TIMER(profile, "AddSegmentIOTime", writer_stat.add_segment_io_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitTime", writer_stat.commit_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitRowsetBuildTime", writer_stat.commit_rowset_build_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitFinishPkTime", writer_stat.commit_finish_pk_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitPkPreloadTime", writer_stat.commit_pk_preload_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitTxnCommitTime", writer_stat.commit_txn_commit_time_ns);

auto* segment_flush_token = writer->segment_flush_token();
Expand Down
14 changes: 10 additions & 4 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ Status DeltaWriter::_flush_memtable() {
auto elapsed_time = watch.elapsed_time();
ADD_COUNTER_RELAXED(_stats.memory_exceed_count, 1);
ADD_COUNTER_RELAXED(_stats.write_wait_flush_time_ns, elapsed_time);
StarRocksMetrics::instance()->delta_writer_wait_flush_task_total.increment(1);
StarRocksMetrics::instance()->delta_writer_wait_flush_duration_us.increment(elapsed_time / 1000);
return st;
}
Expand Down Expand Up @@ -750,7 +751,7 @@ Status DeltaWriter::commit() {
return st;
}
}
auto pk_finish_ts = watch.elapsed_time();
auto pk_preload_ts = watch.elapsed_time();

if (_replicate_token != nullptr) {
if (auto st = _replicate_token->wait(); UNLIKELY(!st.ok())) {
Expand Down Expand Up @@ -786,11 +787,16 @@ Status DeltaWriter::commit() {
ADD_COUNTER_RELAXED(_stats.commit_time_ns, watch.elapsed_time());
ADD_COUNTER_RELAXED(_stats.commit_wait_flush_time_ns, flush_ts);
ADD_COUNTER_RELAXED(_stats.commit_rowset_build_time_ns, rowset_build_ts - flush_ts);
ADD_COUNTER_RELAXED(_stats.commit_finish_pk_time_ns, pk_finish_ts - rowset_build_ts);
ADD_COUNTER_RELAXED(_stats.commit_wait_replica_time_ns, replica_ts - pk_finish_ts);
ADD_COUNTER_RELAXED(_stats.commit_pk_preload_time_ns, pk_preload_ts - rowset_build_ts);
ADD_COUNTER_RELAXED(_stats.commit_wait_replica_time_ns, replica_ts - pk_preload_ts);
ADD_COUNTER_RELAXED(_stats.commit_txn_commit_time_ns, commit_txn_ts - replica_ts);
StarRocksMetrics::instance()->delta_writer_commit_task_total.increment(1);
StarRocksMetrics::instance()->delta_writer_wait_flush_task_total.increment(1);
StarRocksMetrics::instance()->delta_writer_wait_flush_duration_us.increment(flush_ts / 1000);
StarRocksMetrics::instance()->delta_writer_wait_replica_duration_us.increment((replica_ts - pk_finish_ts) / 1000);
StarRocksMetrics::instance()->delta_writer_pk_preload_duration_us.increment((pk_preload_ts - rowset_build_ts) /
1000);
StarRocksMetrics::instance()->delta_writer_wait_replica_duration_us.increment((replica_ts - pk_preload_ts) / 1000);
StarRocksMetrics::instance()->delta_writer_txn_commit_duration_us.increment((commit_txn_ts - replica_ts) / 1000);
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ struct DeltaWriterStat {
// Time to build rowset in commit()
std::atomic_int64_t commit_rowset_build_time_ns = 0;
// Time to deal with primary key in commit() which may load data from disk
std::atomic_int64_t commit_finish_pk_time_ns = 0;
std::atomic_int64_t commit_pk_preload_time_ns = 0;
// Time to wait for replica sync in commit()
std::atomic_int64_t commit_wait_replica_time_ns = 0;
// Time to commit txn in commit()
Expand Down
5 changes: 5 additions & 0 deletions be/src/util/starrocks_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ StarRocksMetrics::StarRocksMetrics() : _metrics(_s_registry_name) {
REGISTER_STARROCKS_METRIC(pipe_poller_block_queue_len);

REGISTER_STARROCKS_METRIC(load_channel_add_chunks_total);
REGISTER_STARROCKS_METRIC(load_channel_add_chunks_eos_total);
REGISTER_STARROCKS_METRIC(load_channel_add_chunks_duration_us);
REGISTER_STARROCKS_METRIC(load_channel_add_chunks_wait_memtable_duration_us);
REGISTER_STARROCKS_METRIC(load_channel_add_chunks_wait_writer_duration_us);
Expand All @@ -71,8 +72,12 @@ StarRocksMetrics::StarRocksMetrics() : _metrics(_s_registry_name) {
REGISTER_STARROCKS_METRIC(async_delta_writer_task_execute_duration_us);
REGISTER_STARROCKS_METRIC(async_delta_writer_task_pending_duration_us);

REGISTER_STARROCKS_METRIC(delta_writer_commit_task_total);
REGISTER_STARROCKS_METRIC(delta_writer_wait_flush_task_total);
REGISTER_STARROCKS_METRIC(delta_writer_wait_flush_duration_us);
REGISTER_STARROCKS_METRIC(delta_writer_pk_preload_duration_us);
REGISTER_STARROCKS_METRIC(delta_writer_wait_replica_duration_us);
REGISTER_STARROCKS_METRIC(delta_writer_txn_commit_duration_us);

REGISTER_STARROCKS_METRIC(memtable_flush_total);
REGISTER_STARROCKS_METRIC(memtable_finalize_duration_us);
Expand Down
12 changes: 11 additions & 1 deletion be/src/util/starrocks_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ class StarRocksMetrics {
// Metrics for LoadChannel
// The number that LoadChannel#add_chunks is accessed
METRIC_DEFINE_INT_COUNTER(load_channel_add_chunks_total, MetricUnit::OPERATIONS);
// The number that LoadChannel#add_chunks eos is accessed
METRIC_DEFINE_INT_COUNTER(load_channel_add_chunks_eos_total, MetricUnit::OPERATIONS);
// Accumulated time that LoadChannel#add_chunks costs. The time can be divided into
// waiting memtable, waiting async delta writer, waiting replicas, and others.
METRIC_DEFINE_INT_COUNTER(load_channel_add_chunks_duration_us, MetricUnit::MICROSECONDS);
Expand All @@ -223,17 +225,25 @@ class StarRocksMetrics {
METRIC_DEFINE_INT_COUNTER(async_delta_writer_task_execute_duration_us, MetricUnit::MICROSECONDS);
// Accumulated time that task pends in the queue
METRIC_DEFINE_INT_COUNTER(async_delta_writer_task_pending_duration_us, MetricUnit::MICROSECONDS);

// Metrics for metadata lru cache
METRIC_DEFINE_INT_GAUGE(metadata_cache_bytes_total, MetricUnit::BYTES);

// Metrics for delta writer
// The number of eos task that executed
METRIC_DEFINE_INT_COUNTER(delta_writer_commit_task_total, MetricUnit::OPERATIONS);
// The number of wait flush task that executed, include memory exceed and commit
METRIC_DEFINE_INT_COUNTER(delta_writer_wait_flush_task_total, MetricUnit::OPERATIONS);
// Accumulated time that delta writer waits for memtable flush. It's part of
// async_delta_writer_task_execute_duration_us
METRIC_DEFINE_INT_COUNTER(delta_writer_wait_flush_duration_us, MetricUnit::MICROSECONDS);
// Accumulated time that delta writer preload rowset for pk table. It's part of
// async_delta_writer_task_execute_duration_us
METRIC_DEFINE_INT_COUNTER(delta_writer_pk_preload_duration_us, MetricUnit::MICROSECONDS);
// Accumulated time that delta writer waits for secondary replicas sync. It's part of
// async_delta_writer_task_execute_duration_us
METRIC_DEFINE_INT_COUNTER(delta_writer_wait_replica_duration_us, MetricUnit::MICROSECONDS);
// Accumulated time that delta writer commit txn. It's part of async_delta_writer_task_execute_duration_us
METRIC_DEFINE_INT_COUNTER(delta_writer_txn_commit_duration_us, MetricUnit::MICROSECONDS);

METRIC_DEFINE_INT_COUNTER(memtable_flush_total, MetricUnit::OPERATIONS);
METRIC_DEFINE_INT_COUNTER(memtable_finalize_duration_us, MetricUnit::MICROSECONDS);
Expand Down
5 changes: 5 additions & 0 deletions be/test/util/starrocks_metrics_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ TEST_F(StarRocksMetricsTest, test_metrics_register) {
assert_threadpool_metrics_register("remote_snapshot", instance);
assert_threadpool_metrics_register("replicate_snapshot", instance);
ASSERT_NE(nullptr, instance->get_metric("load_channel_add_chunks_total"));
ASSERT_NE(nullptr, instance->get_metric("load_channel_add_chunks_eos_total"));
ASSERT_NE(nullptr, instance->get_metric("load_channel_add_chunks_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("load_channel_add_chunks_wait_memtable_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("load_channel_add_chunks_wait_writer_duration_us"));
Expand All @@ -356,8 +357,12 @@ TEST_F(StarRocksMetricsTest, test_metrics_register) {
ASSERT_NE(nullptr, instance->get_metric("async_delta_writer_task_total"));
ASSERT_NE(nullptr, instance->get_metric("async_delta_writer_task_execute_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("async_delta_writer_task_pending_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("delta_writer_commit_task_total"));
ASSERT_NE(nullptr, instance->get_metric("delta_writer_wait_flush_task_total"));
ASSERT_NE(nullptr, instance->get_metric("delta_writer_wait_flush_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("delta_writer_pk_preload_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("delta_writer_wait_replica_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("delta_writer_txn_commit_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("memtable_finalize_duration_us"));
}

Expand Down

0 comments on commit 341d385

Please sign in to comment.