Skip to content

Commit

Permalink
[Enhancement] Use bthread lock in merge commit rpc (#54575)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
(cherry picked from commit 827547a)
  • Loading branch information
banmoy authored and mergify[bot] committed Jan 3, 2025
1 parent 215fc36 commit d181755
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 20 deletions.
6 changes: 3 additions & 3 deletions be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Status BatchWriteMgr::append_data(StreamLoadContext* data_ctx) {
StatusOr<IsomorphicBatchWriteSharedPtr> BatchWriteMgr::_get_batch_write(const starrocks::BatchWriteId& batch_write_id,
bool create_if_missing) {
{
std::shared_lock<std::shared_mutex> lock(_mutex);
std::shared_lock<bthreads::BThreadSharedMutex> lock(_rw_mutex);
auto it = _batch_write_map.find(batch_write_id);
if (it != _batch_write_map.end()) {
return it->second;
Expand All @@ -69,7 +69,7 @@ StatusOr<IsomorphicBatchWriteSharedPtr> BatchWriteMgr::_get_batch_write(const st
return Status::NotFound("");
}

std::unique_lock<std::shared_mutex> lock(_mutex);
std::unique_lock<bthreads::BThreadSharedMutex> lock(_rw_mutex);
if (_stopped) {
return Status::ServiceUnavailable("Batch write is stopped");
}
Expand All @@ -92,7 +92,7 @@ StatusOr<IsomorphicBatchWriteSharedPtr> BatchWriteMgr::_get_batch_write(const st
void BatchWriteMgr::stop() {
std::vector<IsomorphicBatchWriteSharedPtr> stop_writes;
{
std::unique_lock<std::shared_mutex> lock(_mutex);
std::unique_lock<bthreads::BThreadSharedMutex> lock(_rw_mutex);
if (_stopped) {
return;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/batch_write/batch_write_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

#pragma once

#include <shared_mutex>
#include <unordered_map>

#include "common/statusor.h"
#include "runtime/batch_write/isomorphic_batch_write.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/bthreads/bthread_shared_mutex.h"
#include "util/bthreads/executor.h"

namespace brpc {
Expand Down Expand Up @@ -58,7 +58,7 @@ class BatchWriteMgr {
bool create_if_missing);

std::unique_ptr<bthreads::ThreadPoolExecutor> _executor;
std::shared_mutex _mutex;
bthreads::BThreadSharedMutex _rw_mutex;
std::unordered_map<BatchWriteId, IsomorphicBatchWriteSharedPtr, BatchWriteIdHash, BatchWriteIdEqual>
_batch_write_map;
bool _stopped{false};
Expand Down
23 changes: 12 additions & 11 deletions be/src/runtime/batch_write/isomorphic_batch_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ Status IsomorphicBatchWrite::init() {

void IsomorphicBatchWrite::stop() {
{
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_stopped) {
return;
}
Expand All @@ -155,7 +155,7 @@ void IsomorphicBatchWrite::stop() {

std::vector<StreamLoadContext*> release_contexts;
{
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<bthread::Mutex> lock(_mutex);
release_contexts.insert(release_contexts.end(), _alive_stream_load_pipe_ctxs.begin(),
_alive_stream_load_pipe_ctxs.end());
release_contexts.insert(release_contexts.end(), _dead_stream_load_pipe_ctxs.begin(),
Expand All @@ -172,7 +172,7 @@ void IsomorphicBatchWrite::stop() {
}

Status IsomorphicBatchWrite::register_stream_load_pipe(StreamLoadContext* pipe_ctx) {
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
}
Expand All @@ -188,7 +188,7 @@ Status IsomorphicBatchWrite::register_stream_load_pipe(StreamLoadContext* pipe_c
void IsomorphicBatchWrite::unregister_stream_load_pipe(StreamLoadContext* pipe_ctx) {
bool find = false;
{
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<bthread::Mutex> lock(_mutex);
find = _alive_stream_load_pipe_ctxs.erase(pipe_ctx) > 0;
if (!find) {
find = _dead_stream_load_pipe_ctxs.erase(pipe_ctx) > 0;
Expand All @@ -202,7 +202,7 @@ void IsomorphicBatchWrite::unregister_stream_load_pipe(StreamLoadContext* pipe_c
}

bool IsomorphicBatchWrite::contain_pipe(StreamLoadContext* pipe_ctx) {
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<bthread::Mutex> lock(_mutex);
auto it = _alive_stream_load_pipe_ctxs.find(pipe_ctx);
if (it != _alive_stream_load_pipe_ctxs.end()) {
return true;
Expand All @@ -211,7 +211,7 @@ bool IsomorphicBatchWrite::contain_pipe(StreamLoadContext* pipe_ctx) {
}

bool IsomorphicBatchWrite::is_pipe_alive(starrocks::StreamLoadContext* pipe_ctx) {
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<bthread::Mutex> lock(_mutex);
auto it = _alive_stream_load_pipe_ctxs.find(pipe_ctx);
return it != _alive_stream_load_pipe_ctxs.end();
}
Expand Down Expand Up @@ -322,9 +322,10 @@ Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) {
}
{
SCOPED_RAW_TIMER(&wait_pipe_cost_ns);
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait_for(lock, std::chrono::milliseconds(config::batch_write_rpc_request_retry_interval_ms),
[&]() { return !_alive_stream_load_pipe_ctxs.empty(); });
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_alive_stream_load_pipe_ctxs.empty()) {
_cv.wait_for(lock, config::batch_write_rpc_request_retry_interval_ms * 1000);
}
}
}
async_ctx->append_pipe_cost_ns.store(write_data_cost_ns);
Expand All @@ -346,7 +347,7 @@ Status IsomorphicBatchWrite::_write_data_to_pipe(AsyncAppendDataContext* async_c
while (true) {
StreamLoadContext* pipe_ctx;
{
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<bthread::Mutex> lock(_mutex);
if (!_alive_stream_load_pipe_ctxs.empty()) {
pipe_ctx = *(_alive_stream_load_pipe_ctxs.begin());
// take a reference to avoid being released when appending data to the pipe outside the lock
Expand All @@ -372,7 +373,7 @@ Status IsomorphicBatchWrite::_write_data_to_pipe(AsyncAppendDataContext* async_c
// if failed, the pipe can't be appended anymore and move it from
// the alive to the dead, and wait for being unregistered
{
std::unique_lock<std::mutex> lock(_mutex);
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_alive_stream_load_pipe_ctxs.erase(pipe_ctx)) {
_dead_stream_load_pipe_ctxs.emplace(pipe_ctx);
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/runtime/batch_write/isomorphic_batch_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

#include <atomic>
#include <map>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <unordered_set>

Expand Down Expand Up @@ -73,8 +71,8 @@ class IsomorphicBatchWrite {
bthreads::ThreadPoolExecutor* _executor;
bool _batch_write_async{false};

std::mutex _mutex;
std::condition_variable _cv;
bthread::Mutex _mutex;
bthread::ConditionVariable _cv;
std::unordered_set<StreamLoadContext*> _alive_stream_load_pipe_ctxs;
std::unordered_set<StreamLoadContext*> _dead_stream_load_pipe_ctxs;

Expand Down

0 comments on commit d181755

Please sign in to comment.