Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] (batch write part3) Implement batch write on BE #52556

Merged
merged 21 commits into from
Nov 14, 2024

Conversation

banmoy
Copy link
Contributor

@banmoy banmoy commented Nov 2, 2024

Why I'm doing:

The third PR to support batching stream load on the server side. This pr mainly implements the logic on the BE side

What I'm doing:

New Components

  • IsomorphicBatchWrite: manages load requests with the isomorphic parameters under the same db + table. These requests can be in a batch.
  • BatchWriteMgr: manages IsomorphicBatchWrite
  • TimeBoundedStreamLoadPipe: a specialized implementation of the StreamLoadPipe with a time constraint. It will finish itself after a period. After the pipe finishes, appending data will get CapacityLimitExceed status
  • API: BE can receive the load request from both http and brpc. The http api reuses the normal stream load. stream_load is the newly added brpc interface. Compared to HTTP, the brpc interface is more suitable for scenarios with high concurrency, high throughput, and asynchronous operations. Currently, the brpc interface is mainly used in conjunction with Flink connector which depends on brpc-java. Note that we can extend this brpc interface for the normal stream load in the future to improve the communication efficiency.

The following diagram shows the process of BE from receiving requests to pushing data into the plan

  1. BE receives request via http or brpc, and find the IsomorphicBatchWrite according to the db + table + load parameters. The request then pushes the data to the bthread::ExecutionQueue, and waits for the data is loaded. The queue can solve the concurrency problem
  2. An executor continuously pulls data from the queue, and try to push data to the TimeBoundedStreamLoadPipe
  3. but for the first time, there is no available pipe, so the executor sends thrift rpc requestBatchWrite to FE to request a load plan, and the executor will wait for the plan
  4. the plan is deployed to BE, and it uses pipeline engine
  5. each pipeline creates a TimeBoundedStreamLoadPipe, and registers it to IsomorphicBatchWrite
  6. the executor is notified after the pipe register successfully

7-10. the executor pushes the data to the pipe continously

  1. the pipe finishes itself after a period time, and returns Status::CapacityLimitExceed if data is still pushed to it. After all pipes of the plan finish, there is no more data for this load, and data can be committed
  2. the executor sends rpc to FE to request a new load plan if get Status::CapacityLimitExceed when pushing data to the current pipe so that the data can be pushed to the next load

image

Fixes #51085

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

@banmoy banmoy requested review from a team as code owners November 2, 2024 16:43
@wanpengfei-git wanpengfei-git requested a review from a team November 2, 2024 16:44
@mergify mergify bot assigned banmoy Nov 2, 2024
@wanpengfei-git wanpengfei-git removed the request for review from a team November 2, 2024 16:49
@banmoy banmoy force-pushed the be_batch branch 2 times, most recently from c8b1718 to 906086d Compare November 3, 2024 02:38
@wanpengfei-git wanpengfei-git requested a review from a team November 3, 2024 02:39
@banmoy banmoy force-pushed the be_batch branch 15 times, most recently from 74e915b to 8fe9a95 Compare November 5, 2024 17:43
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
Signed-off-by: PengFei Li <[email protected]>
gensrc/proto/internal_service.proto Outdated Show resolved Hide resolved
be/src/http/action/stream_load.cpp Outdated Show resolved Hide resolved
be/src/http/http_common.h Outdated Show resolved Hide resolved
Signed-off-by: PengFei Li <[email protected]>
@wyb wyb enabled auto-merge (squash) November 14, 2024 04:54
Copy link

[Java-Extensions Incremental Coverage Report]

pass : 0 / 0 (0%)

Copy link

[BE Incremental Coverage Report]

pass : 513 / 626 (81.95%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 be/src/exec/pipeline/fragment_executor.cpp 0 23 00.00% [628, 629, 630, 631, 632, 633, 634, 636, 640, 653, 654, 656, 657, 660, 661, 662, 664, 665, 666, 668, 669, 670, 676]
🔵 be/src/service/internal_service.cpp 0 5 00.00% [1255, 1258, 1259, 1260, 1261]
🔵 be/src/runtime/stream_load/transaction_mgr.h 0 1 00.00% [95]
🔵 be/src/runtime/plan_fragment_executor.cpp 0 1 00.00% [514]
🔵 be/src/runtime/stream_load/stream_load_pipe.h 1 16 06.25% [57, 59, 61, 63, 64, 65, 66, 101, 102, 103, 104, 106, 107, 108, 109]
🔵 be/src/exec/csv_scanner.cpp 1 4 25.00% [308, 559, 560]
🔵 be/src/exec/json_scanner.cpp 4 9 44.44% [113, 114, 248, 249, 250]
🔵 be/src/exec/pipeline/fragment_context.cpp 5 11 45.45% [204, 205, 394, 395, 396, 398]
🔵 be/src/runtime/stream_load/time_bounded_stream_load_pipe.h 8 16 50.00% [29, 32, 33, 34, 35, 36, 40, 41]
🔵 be/src/runtime/stream_load/time_bounded_stream_load_pipe.cpp 14 17 82.35% [36, 37, 38]
🔵 be/src/runtime/batch_write/isomorphic_batch_write.cpp 249 284 87.68% [65, 69, 134, 135, 165, 194, 227, 228, 229, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 296, 350, 363, 396, 397, 415]
🔵 be/src/runtime/batch_write/batch_write_mgr.cpp 131 138 94.93% [40, 78, 104, 179, 203, 212, 218]
🔵 be/src/runtime/batch_write/batch_write_util.cpp 32 33 96.97% [81]
🔵 be/src/exec/file_scanner.cpp 1 1 100.00% []
🔵 be/src/runtime/exec_env.h 1 1 100.00% []
🔵 be/src/http/action/stream_load.cpp 22 22 100.00% []
🔵 be/src/runtime/stream_load/stream_load_pipe.cpp 17 17 100.00% []
🔵 be/src/runtime/exec_env.cpp 7 7 100.00% []
🔵 be/src/runtime/batch_write/batch_write_mgr.h 1 1 100.00% []
🔵 be/src/runtime/batch_write/batch_write_util.h 9 9 100.00% []
🔵 be/src/runtime/stream_load/stream_load_context.cpp 10 10 100.00% []

Copy link

[FE Incremental Coverage Report]

pass : 1 / 1 (100.00%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 com/starrocks/service/FrontendServiceImpl.java 1 1 100.00% []

@wyb wyb merged commit 3fb71a5 into StarRocks:main Nov 14, 2024
57 checks passed
}
}
if (!create_if_missing) {
return Status::NotFound("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add more detailed error message?

Comment on lines +305 to +309
// TODO check if the error is retryable
st = _send_rpc_request(async_ctx->data_ctx());
int64_t wait_ts = MonotonicNanos();
rpc_cost_ns += wait_ts - rpc_ts;
st = _wait_for_stream_load_pipe();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

st assigned twice without checking? is this intentional?

_dead_stream_load_pipe_ctxs.emplace(pipe_ctx);
it = _alive_stream_load_pipe_ctxs.erase(it);
}
return st.ok() ? Status::CapacityLimitExceed("") : st;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add more info for error message?

if (!_alive_stream_load_pipe_ctxs.empty()) {
return Status::OK();
}
return Status::TimedOut("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add more info for error message?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support batching of stream loads on the server side
7 participants