Skip to content

Commit

Permalink
Rename requestBatchWrite thrift rpc
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Nov 12, 2024
1 parent e1839be commit 12574b3
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 18 deletions.
6 changes: 3 additions & 3 deletions be/src/runtime/batch_write/isomorphic_batch_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ Status IsomorphicBatchWrite::_wait_for_stream_load_pipe() {

Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) {
TNetworkAddress master_addr = get_master_address();
TBatchWriteRequest request;
TMergeCommitRequest request;
request.__set_db(_batch_write_id.db);
request.__set_tbl(_batch_write_id.table);
request.__set_user(data_ctx->auth.user);
Expand All @@ -372,14 +372,14 @@ Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) {
request.__set_backend_host(BackendOptions::get_localhost());
request.__set_params(_batch_write_id.load_params);

TBatchWriteResult response;
TMergeCommitResult response;
Status st;

#ifndef BE_TEST
int64_t start_ts = MonotonicNanos();
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &response](FrontendServiceConnection& client) { client->requestBatchWrite(response, request); },
[&request, &response](FrontendServiceConnection& client) { client->requestMergeCommit(response, request); },
config::batch_write_rpc_reqeust_timeout_ms);
TRACE_BATCH_WRITE << "receive requestBatchWrite response, " << _batch_write_id
<< ", user label: " << data_ctx->label << ", master: " << master_addr
Expand Down
6 changes: 3 additions & 3 deletions be/test/runtime/batch_write/isomorphic_batch_write_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ TEST_F(IsomorphicBatchWriteTest, append_data_async) {
SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::status",
[&](void* arg) { *((Status*)arg) = Status::OK(); });
SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::response", [&](void* arg) {
TBatchWriteResult* result = (TBatchWriteResult*)arg;
TMergeCommitResult* result = (TMergeCommitResult*)arg;
TStatus status;
status.__set_status_code(TStatusCode::OK);
result->__set_status(status);
Expand Down Expand Up @@ -188,7 +188,7 @@ TEST_F(IsomorphicBatchWriteTest, append_data_async) {
SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::status",
[&](void* arg) { *((Status*)arg) = Status::OK(); });
SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::response", [&](void* arg) {
TBatchWriteResult* result = (TBatchWriteResult*)arg;
TMergeCommitResult* result = (TMergeCommitResult*)arg;
TStatus status;
status.__set_status_code(TStatusCode::OK);
result->__set_status(status);
Expand Down Expand Up @@ -250,7 +250,7 @@ void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_stat
SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::status",
[&](void* arg) { *((Status*)arg) = Status::OK(); });
SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::response", [&](void* arg) {
TBatchWriteResult* result = (TBatchWriteResult*)arg;
TMergeCommitResult* result = (TMergeCommitResult*)arg;
TStatus status;
status.__set_status_code(TStatusCode::OK);
result->__set_status(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@
import com.starrocks.thrift.TAuthenticateParams;
import com.starrocks.thrift.TBatchReportExecStatusParams;
import com.starrocks.thrift.TBatchReportExecStatusResult;
import com.starrocks.thrift.TBatchWriteRequest;
import com.starrocks.thrift.TBatchWriteResult;
import com.starrocks.thrift.TBeginRemoteTxnRequest;
import com.starrocks.thrift.TBeginRemoteTxnResponse;
import com.starrocks.thrift.TColumnDef;
Expand Down Expand Up @@ -270,6 +268,8 @@
import com.starrocks.thrift.TMasterOpResult;
import com.starrocks.thrift.TMasterResult;
import com.starrocks.thrift.TMaterializedViewStatus;
import com.starrocks.thrift.TMergeCommitRequest;
import com.starrocks.thrift.TMergeCommitResult;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TNodesInfo;
import com.starrocks.thrift.TObjectDependencyReq;
Expand Down Expand Up @@ -1706,8 +1706,8 @@ TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request) throws
}

@Override
public TBatchWriteResult requestBatchWrite(TBatchWriteRequest request) throws TException {
TBatchWriteResult result = new TBatchWriteResult();
public TMergeCommitResult requestMergeCommit(TMergeCommitRequest request) throws TException {
TMergeCommitResult result = new TMergeCommitResult();
try {
checkPasswordAndLoadPriv(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(), request.getUser_ip());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import com.starrocks.sql.ast.PartitionDesc;
import com.starrocks.sql.ast.SingleItemListPartitionDesc;
import com.starrocks.thrift.TAuthInfo;
import com.starrocks.thrift.TBatchWriteRequest;
import com.starrocks.thrift.TBatchWriteResult;
import com.starrocks.thrift.TColumnDef;
import com.starrocks.thrift.TCreatePartitionRequest;
import com.starrocks.thrift.TCreatePartitionResult;
Expand All @@ -69,6 +67,8 @@
import com.starrocks.thrift.TLoadTxnBeginResult;
import com.starrocks.thrift.TLoadTxnCommitRequest;
import com.starrocks.thrift.TLoadTxnCommitResult;
import com.starrocks.thrift.TMergeCommitRequest;
import com.starrocks.thrift.TMergeCommitResult;
import com.starrocks.thrift.TResourceUsage;
import com.starrocks.thrift.TSetConfigRequest;
import com.starrocks.thrift.TSetConfigResponse;
Expand Down Expand Up @@ -1210,7 +1210,7 @@ public void testStreamLoadPutTimeout() throws UserException, TException, LockTim
@Test
public void testRequestBatchWrite() throws Exception {
FrontendServiceImpl impl = new FrontendServiceImpl(exeEnv);
TBatchWriteRequest request = new TBatchWriteRequest();
TMergeCommitRequest request = new TMergeCommitRequest();
request.setDb("test");
request.setTbl("site_access_hour");
request.setUser("root");
Expand All @@ -1233,15 +1233,15 @@ public RequestLoadResult requestLoad(

// test success request
{
TBatchWriteResult result = impl.requestBatchWrite(request);
TMergeCommitResult result = impl.requestMergeCommit(request);
assertEquals(TStatusCode.OK, result.getStatus().getStatus_code());
assertEquals("test_label", result.getLabel());
}

// test authentication failure
{
request.setUser("fake_user");
TBatchWriteResult result = impl.requestBatchWrite(request);
TMergeCommitResult result = impl.requestMergeCommit(request);
assertEquals(TStatusCode.NOT_AUTHORIZED, result.getStatus().getStatus_code());
}
}
Expand Down
6 changes: 3 additions & 3 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ struct TStreamLoadPutResult {
2: optional InternalService.TExecPlanFragmentParams params
}

struct TBatchWriteRequest {
struct TMergeCommitRequest {
1: optional string db
2: optional string tbl
3: optional string user
Expand All @@ -974,7 +974,7 @@ struct TBatchWriteRequest {
8: optional map<string, string> params;
}

struct TBatchWriteResult {
struct TMergeCommitResult {
1: optional Status.TStatus status;
// only valid for success
2: optional string label;
Expand Down Expand Up @@ -1922,7 +1922,7 @@ service FrontendService {

TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request)

TBatchWriteResult requestBatchWrite(1: TBatchWriteRequest request)
TMergeCommitResult requestMergeCommit(1: TMergeCommitRequest request)

Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)

Expand Down

0 comments on commit 12574b3

Please sign in to comment.