-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] (batch write part3) Implement batch write on BE #52556
Conversation
c8b1718
to
906086d
Compare
74e915b
to
8fe9a95
Compare
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Quality Gate passedIssues Measures |
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[BE Incremental Coverage Report]✅ pass : 513 / 626 (81.95%) file detail
|
[FE Incremental Coverage Report]✅ pass : 1 / 1 (100.00%) file detail
|
} | ||
} | ||
if (!create_if_missing) { | ||
return Status::NotFound(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more detailed error message?
// TODO check if the error is retryable | ||
st = _send_rpc_request(async_ctx->data_ctx()); | ||
int64_t wait_ts = MonotonicNanos(); | ||
rpc_cost_ns += wait_ts - rpc_ts; | ||
st = _wait_for_stream_load_pipe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
st
assigned twice without checking? is this intentional?
_dead_stream_load_pipe_ctxs.emplace(pipe_ctx); | ||
it = _alive_stream_load_pipe_ctxs.erase(it); | ||
} | ||
return st.ok() ? Status::CapacityLimitExceed("") : st; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more info for error message?
if (!_alive_stream_load_pipe_ctxs.empty()) { | ||
return Status::OK(); | ||
} | ||
return Status::TimedOut(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more info for error message?
Why I'm doing:
The third PR to support batching stream load on the server side. This pr mainly implements the logic on the BE side
What I'm doing:
New Components
IsomorphicBatchWrite
: manages load requests with the isomorphic parameters under the same db + table. These requests can be in a batch.BatchWriteMgr
: managesIsomorphicBatchWrite
TimeBoundedStreamLoadPipe
: a specialized implementation of theStreamLoadPipe
with a time constraint. It will finish itself after a period. After the pipe finishes, appending data will getCapacityLimitExceed
statusstream_load
is the newly added brpc interface. Compared to HTTP, the brpc interface is more suitable for scenarios with high concurrency, high throughput, and asynchronous operations. Currently, the brpc interface is mainly used in conjunction with Flink connector which depends on brpc-java. Note that we can extend this brpc interface for the normal stream load in the future to improve the communication efficiency.The following diagram shows the process of BE from receiving requests to pushing data into the plan
IsomorphicBatchWrite
according to the db + table + load parameters. The request then pushes the data to thebthread::ExecutionQueue
, and waits for the data is loaded. The queue can solve the concurrency problemTimeBoundedStreamLoadPipe
requestBatchWrite
to FE to request a load plan, and the executor will wait for the planTimeBoundedStreamLoadPipe
, and registers it toIsomorphicBatchWrite
7-10. the executor pushes the data to the pipe continously
Status::CapacityLimitExceed
if data is still pushed to it. After all pipes of the plan finish, there is no more data for this load, and data can be committedStatus::CapacityLimitExceed
when pushing data to the current pipe so that the data can be pushed to the next loadFixes #51085
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: