From 12574b376e6111f0a870ab4aff5727d361ec9f8c Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Tue, 12 Nov 2024 18:58:55 +0800 Subject: [PATCH] Rename requestBatchWrite thrift rpc Signed-off-by: PengFei Li --- be/src/runtime/batch_write/isomorphic_batch_write.cpp | 6 +++--- .../batch_write/isomorphic_batch_write_test.cpp | 6 +++--- .../com/starrocks/service/FrontendServiceImpl.java | 8 ++++---- .../com/starrocks/service/FrontendServiceImplTest.java | 10 +++++----- gensrc/thrift/FrontendService.thrift | 6 +++--- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/be/src/runtime/batch_write/isomorphic_batch_write.cpp b/be/src/runtime/batch_write/isomorphic_batch_write.cpp index 71f1cf1c061d8..ce0243689f7e7 100644 --- a/be/src/runtime/batch_write/isomorphic_batch_write.cpp +++ b/be/src/runtime/batch_write/isomorphic_batch_write.cpp @@ -359,7 +359,7 @@ Status IsomorphicBatchWrite::_wait_for_stream_load_pipe() { Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) { TNetworkAddress master_addr = get_master_address(); - TBatchWriteRequest request; + TMergeCommitRequest request; request.__set_db(_batch_write_id.db); request.__set_tbl(_batch_write_id.table); request.__set_user(data_ctx->auth.user); @@ -372,14 +372,14 @@ Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) { request.__set_backend_host(BackendOptions::get_localhost()); request.__set_params(_batch_write_id.load_params); - TBatchWriteResult response; + TMergeCommitResult response; Status st; #ifndef BE_TEST int64_t start_ts = MonotonicNanos(); st = ThriftRpcHelper::rpc( master_addr.hostname, master_addr.port, - [&request, &response](FrontendServiceConnection& client) { client->requestBatchWrite(response, request); }, + [&request, &response](FrontendServiceConnection& client) { client->requestMergeCommit(response, request); }, config::batch_write_rpc_reqeust_timeout_ms); TRACE_BATCH_WRITE << "receive requestBatchWrite response, " << _batch_write_id << ", user label: " << data_ctx->label << ", master: " << master_addr diff --git a/be/test/runtime/batch_write/isomorphic_batch_write_test.cpp b/be/test/runtime/batch_write/isomorphic_batch_write_test.cpp index 70f60300984cf..5119e48aa5780 100644 --- a/be/test/runtime/batch_write/isomorphic_batch_write_test.cpp +++ b/be/test/runtime/batch_write/isomorphic_batch_write_test.cpp @@ -158,7 +158,7 @@ TEST_F(IsomorphicBatchWriteTest, append_data_async) { SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::status", [&](void* arg) { *((Status*)arg) = Status::OK(); }); SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::response", [&](void* arg) { - TBatchWriteResult* result = (TBatchWriteResult*)arg; + TMergeCommitResult* result = (TMergeCommitResult*)arg; TStatus status; status.__set_status_code(TStatusCode::OK); result->__set_status(status); @@ -188,7 +188,7 @@ TEST_F(IsomorphicBatchWriteTest, append_data_async) { SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::status", [&](void* arg) { *((Status*)arg) = Status::OK(); }); SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::response", [&](void* arg) { - TBatchWriteResult* result = (TBatchWriteResult*)arg; + TMergeCommitResult* result = (TMergeCommitResult*)arg; TStatus status; status.__set_status_code(TStatusCode::OK); result->__set_status(status); @@ -250,7 +250,7 @@ void IsomorphicBatchWriteTest::test_append_data_sync_base(const Status& rpc_stat SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::status", [&](void* arg) { *((Status*)arg) = Status::OK(); }); SyncPoint::GetInstance()->SetCallBack("IsomorphicBatchWrite::send_rpc_request::response", [&](void* arg) { - TBatchWriteResult* result = (TBatchWriteResult*)arg; + TMergeCommitResult* result = (TMergeCommitResult*)arg; TStatus status; status.__set_status_code(TStatusCode::OK); result->__set_status(status); diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index c9e119d1bbdfd..ab704d209682d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -172,8 +172,6 @@ import com.starrocks.thrift.TAuthenticateParams; import com.starrocks.thrift.TBatchReportExecStatusParams; import com.starrocks.thrift.TBatchReportExecStatusResult; -import com.starrocks.thrift.TBatchWriteRequest; -import com.starrocks.thrift.TBatchWriteResult; import com.starrocks.thrift.TBeginRemoteTxnRequest; import com.starrocks.thrift.TBeginRemoteTxnResponse; import com.starrocks.thrift.TColumnDef; @@ -270,6 +268,8 @@ import com.starrocks.thrift.TMasterOpResult; import com.starrocks.thrift.TMasterResult; import com.starrocks.thrift.TMaterializedViewStatus; +import com.starrocks.thrift.TMergeCommitRequest; +import com.starrocks.thrift.TMergeCommitResult; import com.starrocks.thrift.TNetworkAddress; import com.starrocks.thrift.TNodesInfo; import com.starrocks.thrift.TObjectDependencyReq; @@ -1706,8 +1706,8 @@ TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request) throws } @Override - public TBatchWriteResult requestBatchWrite(TBatchWriteRequest request) throws TException { - TBatchWriteResult result = new TBatchWriteResult(); + public TMergeCommitResult requestMergeCommit(TMergeCommitRequest request) throws TException { + TMergeCommitResult result = new TMergeCommitResult(); try { checkPasswordAndLoadPriv(request.getUser(), request.getPasswd(), request.getDb(), request.getTbl(), request.getUser_ip()); diff --git a/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java b/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java index 5756135295cdb..cac94a27c66a2 100644 --- a/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/service/FrontendServiceImplTest.java @@ -45,8 +45,6 @@ import com.starrocks.sql.ast.PartitionDesc; import com.starrocks.sql.ast.SingleItemListPartitionDesc; import com.starrocks.thrift.TAuthInfo; -import com.starrocks.thrift.TBatchWriteRequest; -import com.starrocks.thrift.TBatchWriteResult; import com.starrocks.thrift.TColumnDef; import com.starrocks.thrift.TCreatePartitionRequest; import com.starrocks.thrift.TCreatePartitionResult; @@ -69,6 +67,8 @@ import com.starrocks.thrift.TLoadTxnBeginResult; import com.starrocks.thrift.TLoadTxnCommitRequest; import com.starrocks.thrift.TLoadTxnCommitResult; +import com.starrocks.thrift.TMergeCommitRequest; +import com.starrocks.thrift.TMergeCommitResult; import com.starrocks.thrift.TResourceUsage; import com.starrocks.thrift.TSetConfigRequest; import com.starrocks.thrift.TSetConfigResponse; @@ -1210,7 +1210,7 @@ public void testStreamLoadPutTimeout() throws UserException, TException, LockTim @Test public void testRequestBatchWrite() throws Exception { FrontendServiceImpl impl = new FrontendServiceImpl(exeEnv); - TBatchWriteRequest request = new TBatchWriteRequest(); + TMergeCommitRequest request = new TMergeCommitRequest(); request.setDb("test"); request.setTbl("site_access_hour"); request.setUser("root"); @@ -1233,7 +1233,7 @@ public RequestLoadResult requestLoad( // test success request { - TBatchWriteResult result = impl.requestBatchWrite(request); + TMergeCommitResult result = impl.requestMergeCommit(request); assertEquals(TStatusCode.OK, result.getStatus().getStatus_code()); assertEquals("test_label", result.getLabel()); } @@ -1241,7 +1241,7 @@ public RequestLoadResult requestLoad( // test authentication failure { request.setUser("fake_user"); - TBatchWriteResult result = impl.requestBatchWrite(request); + TMergeCommitResult result = impl.requestMergeCommit(request); assertEquals(TStatusCode.NOT_AUTHORIZED, result.getStatus().getStatus_code()); } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 537bddc3cce3b..eb2dbf759f3b6 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -963,7 +963,7 @@ struct TStreamLoadPutResult { 2: optional InternalService.TExecPlanFragmentParams params } -struct TBatchWriteRequest { +struct TMergeCommitRequest { 1: optional string db 2: optional string tbl 3: optional string user @@ -974,7 +974,7 @@ struct TBatchWriteRequest { 8: optional map params; } -struct TBatchWriteResult { +struct TMergeCommitResult { 1: optional Status.TStatus status; // only valid for success 2: optional string label; @@ -1922,7 +1922,7 @@ service FrontendService { TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request) - TBatchWriteResult requestBatchWrite(1: TBatchWriteRequest request) + TMergeCommitResult requestMergeCommit(1: TMergeCommitRequest request) Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)