Skip to content

Commit

Permalink
Optimize merge commit performance
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Dec 24, 2024
1 parent fc1e74d commit 42e800e
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 53 deletions.
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,12 @@ CONF_mInt32(max_committed_without_schema_rowset, "1000");

CONF_mInt32(apply_version_slow_log_sec, "30");

// The time that stream load pipe waits for the input. The pipe will block the pipeline scan executor
// util the input is available or the timeout is reached. Don't set this value too large to avoid
// blocking the pipeline scan executor for a long time.
CONF_mInt32(merge_commit_stream_load_pipe_block_wait_us, "500");
// The maximum number of bytes that the merge commit stream load pipe can buffer.
CONF_mInt64(merge_commit_stream_load_pipe_max_buffered_bytes, "1073741824");
CONF_Int32(batch_write_thread_pool_num_min, "0");
CONF_Int32(batch_write_thread_pool_num_max, "512");
CONF_Int32(batch_write_thread_pool_queue_size, "4096");
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ StatusOr<StreamLoadContext*> BatchWriteMgr::create_and_register_pipe(
const std::map<std::string, std::string>& load_parameters, const std::string& label, long txn_id,
const TUniqueId& load_id, int32_t batch_write_interval_ms) {
std::string pipe_name = fmt::format("txn_{}_label_{}_id_{}", txn_id, label, print_id(load_id));
auto pipe = std::make_shared<TimeBoundedStreamLoadPipe>(pipe_name, batch_write_interval_ms);
auto pipe = std::make_shared<TimeBoundedStreamLoadPipe>(pipe_name, batch_write_interval_ms,
config::merge_commit_stream_load_pipe_block_wait_us,
config::merge_commit_stream_load_pipe_max_buffered_bytes);
RETURN_IF_ERROR(exec_env->load_stream_mgr()->put(load_id, pipe));
StreamLoadContext* ctx = new StreamLoadContext(exec_env, load_id);
ctx->ref();
Expand Down Expand Up @@ -188,11 +190,10 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller*
}
ctx->timeout_second = timeout_second;
}
std::string remote_host;
butil::ip2hostname(cntl->remote_side().ip, &remote_host);
auto user_ip = butil::ip2str(cntl->remote_side().ip);
ctx->auth.user = request->user();
ctx->auth.passwd = request->passwd();
ctx->auth.user_ip = remote_host;
ctx->auth.user_ip.assign(user_ip.c_str());
ctx->load_parameters = get_load_parameters_from_brpc(parameters);

butil::IOBuf& io_buf = cntl->request_attachment();
Expand Down Expand Up @@ -221,6 +222,7 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller*
}
ctx->buffer->pos += io_buf.size();
ctx->buffer->flip();
ctx->receive_bytes = io_buf.size();
ctx->status = exec_env->batch_write_mgr()->append_data(ctx);
}

Expand Down
97 changes: 58 additions & 39 deletions be/src/runtime/batch_write/isomorphic_batch_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) {
async_ctx->latch().wait();
async_ctx->total_cost_ns.store(MonotonicNanos() - async_ctx->create_time_ts);
TRACE_BATCH_WRITE << "wait async finish, " << _batch_write_id << ", user label: " << async_ctx->data_ctx()->label
<< ", data size: " << data_ctx->receive_bytes
<< ", user ip: " << data_ctx->auth.user_ip << ", data size: " << data_ctx->receive_bytes
<< ", total_cost: " << (async_ctx->total_cost_ns / 1000)
<< "us, total_async_cost: " << (async_ctx->total_async_cost_ns / 1000)
<< "us, task_pending_cost: " << (async_ctx->task_pending_cost_ns / 1000)
Expand Down Expand Up @@ -286,68 +286,87 @@ int IsomorphicBatchWrite::_execute_tasks(void* meta, bthread::TaskIterator<Task>
}

Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) {
Status st;
int64_t append_pipe_cost_ns = 0;
int64_t write_data_cost_ns = 0;
int64_t rpc_cost_ns = 0;
int64_t wait_pipe_cost_ns = 0;
int num_retries = 0;
while (num_retries <= config::batch_write_rpc_request_retry_num) {
Status st;
while (true) {
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
st = Status::ServiceUnavailable("Batch write is stopped");
break;
}
int64_t append_ts = MonotonicNanos();
st = _write_data(async_ctx);
int64_t rpc_ts = MonotonicNanos();
append_pipe_cost_ns += rpc_ts - append_ts;
if (st.ok()) {
{
SCOPED_RAW_TIMER(&write_data_cost_ns);
st = _write_data_to_pipe(async_ctx);
}
if (st.ok() || num_retries >= config::batch_write_rpc_request_retry_num) {
break;
}
// TODO check if the error is retryable
st = _send_rpc_request(async_ctx->data_ctx());
int64_t wait_ts = MonotonicNanos();
rpc_cost_ns += wait_ts - rpc_ts;
st = _wait_for_stream_load_pipe();
wait_pipe_cost_ns += MonotonicNanos() - wait_ts;
num_retries += 1;
{
SCOPED_RAW_TIMER(&rpc_cost_ns);
// TODO check if the error is retryable if the return status is not ok
(void)_send_rpc_request(async_ctx->data_ctx());
}
{
SCOPED_RAW_TIMER(&wait_pipe_cost_ns);
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait_for(lock, std::chrono::milliseconds(config::batch_write_rpc_request_retry_interval_ms),
[&]() { return !_alive_stream_load_pipe_ctxs.empty(); });
}
}
async_ctx->append_pipe_cost_ns.store(append_pipe_cost_ns);
async_ctx->append_pipe_cost_ns.store(write_data_cost_ns);
async_ctx->rpc_cost_ns.store(rpc_cost_ns);
async_ctx->wait_pipe_cost_ns.store(wait_pipe_cost_ns);
async_ctx->num_retries.store(num_retries);
if (!st.ok()) {
std::stringstream stream;
stream << "Failed to write data to stream load pipe, num retry: " << num_retries
<< ", write_data: " << (write_data_cost_ns / 1000) << " us, rpc: " << (rpc_cost_ns / 1000)
<< "us, wait_pipe: " << (wait_pipe_cost_ns / 1000) << " us, last error: " << st;
st = Status::InternalError(stream.str());
}
return st;
}

Status IsomorphicBatchWrite::_write_data(AsyncAppendDataContext* async_ctx) {
// TODO write data outside the lock
std::unique_lock<std::mutex> lock(_mutex);
Status st;
Status IsomorphicBatchWrite::_write_data_to_pipe(AsyncAppendDataContext* async_ctx) {
StreamLoadContext* data_ctx = async_ctx->data_ctx();
for (auto it = _alive_stream_load_pipe_ctxs.begin(); it != _alive_stream_load_pipe_ctxs.end();) {
StreamLoadContext* pipe_ctx = *it;
// add reference to the buffer to avoid being released if append fails
while (true) {
StreamLoadContext* pipe_ctx;
{
std::unique_lock<std::mutex> lock(_mutex);
if (!_alive_stream_load_pipe_ctxs.empty()) {
pipe_ctx = *(_alive_stream_load_pipe_ctxs.begin());
// take a reference to avoid being released when appending data to the pipe outside the lock
pipe_ctx->ref();
} else {
return Status::CapacityLimitExceed("No available stream load pipe");
}
}
DeferOp defer([&] { StreamLoadContext::release(pipe_ctx); });
// task a reference to avoid being released by the pipe if append fails
ByteBufferPtr buffer = data_ctx->buffer;
st = pipe_ctx->body_sink->append(std::move(buffer));
Status st = pipe_ctx->body_sink->append(std::move(buffer));
if (st.ok()) {
data_ctx->buffer.reset();
async_ctx->pipe_left_active_ns.store(
static_cast<TimeBoundedStreamLoadPipe*>(pipe_ctx->body_sink.get())->left_active_ns());
async_ctx->set_txn(pipe_ctx->txn_id, pipe_ctx->label);
return st;
return Status::OK();
}
TRACE_BATCH_WRITE << "Fail to append data to stream load pipe, " << _batch_write_id
<< ", user label: " << data_ctx->label << ", txn_id: " << pipe_ctx->txn_id
<< ", label: " << pipe_ctx->label << ", status: " << st;
// if failed, the pipe can't be appended anymore and move it from
// the alive to the dead, and wait for being unregistered
{
std::unique_lock<std::mutex> lock(_mutex);
if (_alive_stream_load_pipe_ctxs.erase(pipe_ctx)) {
_dead_stream_load_pipe_ctxs.emplace(pipe_ctx);
}
}
_dead_stream_load_pipe_ctxs.emplace(pipe_ctx);
it = _alive_stream_load_pipe_ctxs.erase(it);
}
return st.ok() ? Status::CapacityLimitExceed("") : st;
}

Status IsomorphicBatchWrite::_wait_for_stream_load_pipe() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait_for(lock, std::chrono::milliseconds(config::batch_write_rpc_request_retry_interval_ms),
[&]() { return !_alive_stream_load_pipe_ctxs.empty(); });
if (!_alive_stream_load_pipe_ctxs.empty()) {
return Status::OK();
}
return Status::TimedOut("");
}

Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/batch_write/isomorphic_batch_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class IsomorphicBatchWrite {
static int _execute_tasks(void* meta, bthread::TaskIterator<Task>& iter);

Status _execute_write(AsyncAppendDataContext* async_ctx);
Status _write_data(AsyncAppendDataContext* data_ctx);
Status _wait_for_stream_load_pipe();
Status _write_data_to_pipe(AsyncAppendDataContext* data_ctx);
Status _send_rpc_request(StreamLoadContext* data_ctx);
Status _wait_for_load_status(StreamLoadContext* data_ctx, int64_t timeout_ns);

Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/stream_load/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ StatusOr<ByteBufferPtr> StreamLoadPipe::read() {
StatusOr<ByteBufferPtr> StreamLoadPipe::no_block_read() {
std::unique_lock<std::mutex> l(_lock);

_get_cond.wait_for(l, std::chrono::milliseconds(_non_blocking_wait_ms),
_get_cond.wait_for(l, std::chrono::microseconds(_non_blocking_wait_us),
[&]() { return _cancelled || _finished || !_buf_queue.empty(); });

// cancelled
Expand Down Expand Up @@ -184,7 +184,7 @@ Status StreamLoadPipe::no_block_read(uint8_t* data, size_t* data_size, bool* eof
if (_read_buf == nullptr || !_read_buf->has_remaining()) {
std::unique_lock<std::mutex> l(_lock);

_get_cond.wait_for(l, std::chrono::milliseconds(_non_blocking_wait_ms),
_get_cond.wait_for(l, std::chrono::microseconds(_non_blocking_wait_us),
[&]() { return _cancelled || _finished || !_buf_queue.empty(); });

// cancelled
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ class StreamLoadPipe : public MessageBodySink {
size_t min_chunk_size = DEFAULT_STREAM_LOAD_PIPE_CHUNK_SIZE)
: StreamLoadPipe(false, -1, max_buffered_bytes, min_chunk_size) {}

StreamLoadPipe(bool non_blocking_read, int32_t non_blocking_wait_ms, size_t max_buffered_bytes,
StreamLoadPipe(bool non_blocking_read, int32_t non_blocking_wait_us, size_t max_buffered_bytes,
size_t min_chunk_size)
: _non_blocking_read(non_blocking_read),
_non_blocking_wait_ms(non_blocking_wait_ms),
_non_blocking_wait_us(non_blocking_wait_us),
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size) {}

Expand Down Expand Up @@ -115,7 +115,7 @@ class StreamLoadPipe : public MessageBodySink {
std::mutex _lock;
size_t _buffered_bytes{0};
bool _non_blocking_read{false};
int32_t _non_blocking_wait_ms;
int32_t _non_blocking_wait_us;
size_t _max_buffered_bytes;
size_t _min_chunk_size;
std::deque<ByteBufferPtr> _buf_queue;
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/stream_load/time_bounded_stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

namespace starrocks {

static constexpr int32_t DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_MS = 50;
static constexpr int32_t DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_US = 500;

class TimeBoundedStreamLoadPipe : public StreamLoadPipe {
public:
TimeBoundedStreamLoadPipe(const std::string& name, int32_t active_window_ms,
int32_t non_blocking_wait_ms = DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_MS,
int32_t non_blocking_wait_us = DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_US,
size_t max_buffered_bytes = DEFAULT_STREAM_LOAD_PIPE_BUFFERED_BYTES)
: StreamLoadPipe(true, non_blocking_wait_ms, max_buffered_bytes, DEFAULT_STREAM_LOAD_PIPE_CHUNK_SIZE) {
: StreamLoadPipe(true, non_blocking_wait_us, max_buffered_bytes, DEFAULT_STREAM_LOAD_PIPE_CHUNK_SIZE) {
_name = name;
_active_window_ns = active_window_ms * (int64_t)1000000;
_start_time_ns = _get_current_ns();
Expand Down

0 comments on commit 42e800e

Please sign in to comment.