From d1817551f7113f6f6d778c8f23d19d76e0bb47d6 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Fri, 3 Jan 2025 10:35:42 +0800 Subject: [PATCH] [Enhancement] Use bthread lock in merge commit rpc (#54575) Signed-off-by: PengFei Li (cherry picked from commit 827547a7f3023cfb55856b0f9a7b5eeff9c8f945) --- .../runtime/batch_write/batch_write_mgr.cpp | 6 ++--- be/src/runtime/batch_write/batch_write_mgr.h | 4 ++-- .../batch_write/isomorphic_batch_write.cpp | 23 ++++++++++--------- .../batch_write/isomorphic_batch_write.h | 6 ++--- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/be/src/runtime/batch_write/batch_write_mgr.cpp b/be/src/runtime/batch_write/batch_write_mgr.cpp index d98fff5a75668..9bbf9ebee286a 100644 --- a/be/src/runtime/batch_write/batch_write_mgr.cpp +++ b/be/src/runtime/batch_write/batch_write_mgr.cpp @@ -59,7 +59,7 @@ Status BatchWriteMgr::append_data(StreamLoadContext* data_ctx) { StatusOr BatchWriteMgr::_get_batch_write(const starrocks::BatchWriteId& batch_write_id, bool create_if_missing) { { - std::shared_lock lock(_mutex); + std::shared_lock lock(_rw_mutex); auto it = _batch_write_map.find(batch_write_id); if (it != _batch_write_map.end()) { return it->second; @@ -69,7 +69,7 @@ StatusOr BatchWriteMgr::_get_batch_write(const st return Status::NotFound(""); } - std::unique_lock lock(_mutex); + std::unique_lock lock(_rw_mutex); if (_stopped) { return Status::ServiceUnavailable("Batch write is stopped"); } @@ -92,7 +92,7 @@ StatusOr BatchWriteMgr::_get_batch_write(const st void BatchWriteMgr::stop() { std::vector stop_writes; { - std::unique_lock lock(_mutex); + std::unique_lock lock(_rw_mutex); if (_stopped) { return; } diff --git a/be/src/runtime/batch_write/batch_write_mgr.h b/be/src/runtime/batch_write/batch_write_mgr.h index a18b70aaa8d82..0b860a27de01d 100644 --- a/be/src/runtime/batch_write/batch_write_mgr.h +++ b/be/src/runtime/batch_write/batch_write_mgr.h @@ -14,12 +14,12 @@ #pragma once -#include #include #include "common/statusor.h" #include "runtime/batch_write/isomorphic_batch_write.h" #include "runtime/stream_load/stream_load_context.h" +#include "util/bthreads/bthread_shared_mutex.h" #include "util/bthreads/executor.h" namespace brpc { @@ -58,7 +58,7 @@ class BatchWriteMgr { bool create_if_missing); std::unique_ptr _executor; - std::shared_mutex _mutex; + bthreads::BThreadSharedMutex _rw_mutex; std::unordered_map _batch_write_map; bool _stopped{false}; diff --git a/be/src/runtime/batch_write/isomorphic_batch_write.cpp b/be/src/runtime/batch_write/isomorphic_batch_write.cpp index b017503eea043..10fade35811d9 100644 --- a/be/src/runtime/batch_write/isomorphic_batch_write.cpp +++ b/be/src/runtime/batch_write/isomorphic_batch_write.cpp @@ -140,7 +140,7 @@ Status IsomorphicBatchWrite::init() { void IsomorphicBatchWrite::stop() { { - std::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); if (_stopped) { return; } @@ -155,7 +155,7 @@ void IsomorphicBatchWrite::stop() { std::vector release_contexts; { - std::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); release_contexts.insert(release_contexts.end(), _alive_stream_load_pipe_ctxs.begin(), _alive_stream_load_pipe_ctxs.end()); release_contexts.insert(release_contexts.end(), _dead_stream_load_pipe_ctxs.begin(), @@ -172,7 +172,7 @@ void IsomorphicBatchWrite::stop() { } Status IsomorphicBatchWrite::register_stream_load_pipe(StreamLoadContext* pipe_ctx) { - std::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); if (_stopped.load(std::memory_order_acquire)) { return Status::ServiceUnavailable("Batch write is stopped"); } @@ -188,7 +188,7 @@ Status IsomorphicBatchWrite::register_stream_load_pipe(StreamLoadContext* pipe_c void IsomorphicBatchWrite::unregister_stream_load_pipe(StreamLoadContext* pipe_ctx) { bool find = false; { - std::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); find = _alive_stream_load_pipe_ctxs.erase(pipe_ctx) > 0; if (!find) { find = _dead_stream_load_pipe_ctxs.erase(pipe_ctx) > 0; @@ -202,7 +202,7 @@ void IsomorphicBatchWrite::unregister_stream_load_pipe(StreamLoadContext* pipe_c } bool IsomorphicBatchWrite::contain_pipe(StreamLoadContext* pipe_ctx) { - std::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); auto it = _alive_stream_load_pipe_ctxs.find(pipe_ctx); if (it != _alive_stream_load_pipe_ctxs.end()) { return true; @@ -211,7 +211,7 @@ bool IsomorphicBatchWrite::contain_pipe(StreamLoadContext* pipe_ctx) { } bool IsomorphicBatchWrite::is_pipe_alive(starrocks::StreamLoadContext* pipe_ctx) { - std::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); auto it = _alive_stream_load_pipe_ctxs.find(pipe_ctx); return it != _alive_stream_load_pipe_ctxs.end(); } @@ -322,9 +322,10 @@ Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) { } { SCOPED_RAW_TIMER(&wait_pipe_cost_ns); - std::unique_lock lock(_mutex); - _cv.wait_for(lock, std::chrono::milliseconds(config::batch_write_rpc_request_retry_interval_ms), - [&]() { return !_alive_stream_load_pipe_ctxs.empty(); }); + std::unique_lock lock(_mutex); + if (_alive_stream_load_pipe_ctxs.empty()) { + _cv.wait_for(lock, config::batch_write_rpc_request_retry_interval_ms * 1000); + } } } async_ctx->append_pipe_cost_ns.store(write_data_cost_ns); @@ -346,7 +347,7 @@ Status IsomorphicBatchWrite::_write_data_to_pipe(AsyncAppendDataContext* async_c while (true) { StreamLoadContext* pipe_ctx; { - std::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); if (!_alive_stream_load_pipe_ctxs.empty()) { pipe_ctx = *(_alive_stream_load_pipe_ctxs.begin()); // take a reference to avoid being released when appending data to the pipe outside the lock @@ -372,7 +373,7 @@ Status IsomorphicBatchWrite::_write_data_to_pipe(AsyncAppendDataContext* async_c // if failed, the pipe can't be appended anymore and move it from // the alive to the dead, and wait for being unregistered { - std::unique_lock lock(_mutex); + std::unique_lock lock(_mutex); if (_alive_stream_load_pipe_ctxs.erase(pipe_ctx)) { _dead_stream_load_pipe_ctxs.emplace(pipe_ctx); } diff --git a/be/src/runtime/batch_write/isomorphic_batch_write.h b/be/src/runtime/batch_write/isomorphic_batch_write.h index 06482dc49513f..c74b9f333ff7b 100644 --- a/be/src/runtime/batch_write/isomorphic_batch_write.h +++ b/be/src/runtime/batch_write/isomorphic_batch_write.h @@ -20,8 +20,6 @@ #include #include -#include -#include #include #include @@ -73,8 +71,8 @@ class IsomorphicBatchWrite { bthreads::ThreadPoolExecutor* _executor; bool _batch_write_async{false}; - std::mutex _mutex; - std::condition_variable _cv; + bthread::Mutex _mutex; + bthread::ConditionVariable _cv; std::unordered_set _alive_stream_load_pipe_ctxs; std::unordered_set _dead_stream_load_pipe_ctxs;