From aeba970ecb13c6bbc564ff01a9aac36141b7d88b Mon Sep 17 00:00:00 2001 From: Harbor Liu <460660596@qq.com> Date: Wed, 23 Oct 2024 15:43:33 +0800 Subject: [PATCH] [Enhancement] Rebase warehouse management framework (#52178) Signed-off-by: HangyuanLiu <460660596@qq.com> (cherry picked from commit db6b7c68373a71aaaf8cc29d59d4023ee919eebf) # Conflicts: # fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java # fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java # fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java # fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 --- .../com/starrocks/alter/SystemHandler.java | 6 +- .../java/com/starrocks/common/Config.java | 6 + .../com/starrocks/journal/JournalEntity.java | 10 ++ .../java/com/starrocks/lake/StarOSAgent.java | 49 ++++++ .../starrocks/persist/DropWarehouseLog.java | 47 ++++++ .../java/com/starrocks/persist/EditLog.java | 24 ++- .../com/starrocks/persist/OperationType.java | 10 +- .../com/starrocks/persist/gson/GsonUtils.java | 4 +- .../persist/metablock/SRMetaBlockID.java | 5 + .../starrocks/privilege/AccessController.java | 10 ++ .../starrocks/privilege/AuthorizationMgr.java | 10 +- .../DefaultAuthorizationProvider.java | 10 +- .../privilege/NativeAccessController.java | 12 ++ .../com/starrocks/privilege/ObjectType.java | 9 +- .../privilege/PrivilegeBuiltinConstants.java | 1 + .../starrocks/privilege/PrivilegeType.java | 8 +- .../privilege/WarehousePEntryObject.java | 128 ++++++++++++++ .../RangerStarRocksAccessController.java | 11 ++ .../com/starrocks/qe/DDLStmtExecutor.java | 45 +++++ .../java/com/starrocks/qe/ShowExecutor.java | 77 +++++++++ .../java/com/starrocks/qe/StmtExecutor.java | 26 +++ .../com/starrocks/server/GlobalStateMgr.java | 2 + .../starrocks/server/WarehouseManager.java | 49 +++++- .../com/starrocks/sql/analyzer/Analyzer.java | 55 ++++++ .../sql/analyzer/AstToStringBuilder.java | 4 +- .../starrocks/sql/analyzer/Authorizer.java | 18 ++ .../sql/analyzer/AuthorizerStmtVisitor.java | 157 ++++++++++++++++++ .../sql/analyzer/PrivilegeStmtAnalyzer.java | 6 +- .../sql/analyzer/WarehouseAnalyzer.java | 99 +++++++++++ .../starrocks/sql/ast/AddBackendClause.java | 12 +- .../sql/ast/AddComputeNodeClause.java | 11 +- .../com/starrocks/sql/ast/AstVisitor.java | 43 ++++- .../starrocks/sql/ast/DropBackendClause.java | 18 +- .../sql/ast/DropComputeNodeClause.java | 12 +- .../ast/warehouse/CreateWarehouseStmt.java | 87 ++++++++++ .../sql/ast/warehouse/DropWarehouseStmt.java | 59 +++++++ .../ast/warehouse/ResumeWarehouseStmt.java | 41 +++++ .../sql/ast/warehouse/SetWarehouseStmt.java | 47 ++++++ .../sql/ast/warehouse/ShowClustersStmt.java | 58 +++++++ .../sql/ast/warehouse/ShowNodesStmt.java | 75 +++++++++ .../sql/ast/warehouse/ShowWarehousesStmt.java | 78 +++++++++ .../ast/warehouse/SuspendWarehouseStmt.java | 41 +++++ .../com/starrocks/sql/parser/AstBuilder.java | 119 ++++++++++++- .../com/starrocks/sql/parser/StarRocks.g4 | 67 +++++++- .../starrocks/system/SystemInfoService.java | 103 +++++++----- .../starrocks/warehouse/DefaultWarehouse.java | 30 ++++ .../com/starrocks/warehouse/Warehouse.java | 6 + .../starrocks/analysis/BackendStmtTest.java | 16 +- .../cluster/SystemInfoServiceTest.java | 45 +++-- .../system/SystemInfoServiceTest.java | 10 +- 50 files changed, 1768 insertions(+), 108 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/persist/DropWarehouseLog.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/privilege/WarehousePEntryObject.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/analyzer/WarehouseAnalyzer.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/CreateWarehouseStmt.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/DropWarehouseStmt.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ResumeWarehouseStmt.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/SetWarehouseStmt.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowClustersStmt.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowNodesStmt.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowWarehousesStmt.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/SuspendWarehouseStmt.java diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/SystemHandler.java b/fe/fe-core/src/main/java/com/starrocks/alter/SystemHandler.java index a48b3e9e1ae05..e2ffaaf520df1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/SystemHandler.java @@ -161,7 +161,7 @@ public Void visitModifyFrontendHostClause(ModifyFrontendAddressClause clause, Vo @Override public Void visitAddBackendClause(AddBackendClause clause, Void context) { ErrorReport.wrapWithRuntimeException(() -> { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(clause.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(clause); }); return null; } @@ -291,7 +291,7 @@ public Void visitModifyBrokerClause(ModifyBrokerClause clause, Void context) { @Override public Void visitAddComputeNodeClause(AddComputeNodeClause clause, Void context) { ErrorReport.wrapWithRuntimeException(() -> { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(clause.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(clause); }); return null; } @@ -299,7 +299,7 @@ public Void visitAddComputeNodeClause(AddComputeNodeClause clause, Void context) @Override public Void visitDropComputeNodeClause(DropComputeNodeClause clause, Void context) { ErrorReport.wrapWithRuntimeException(() -> { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().dropComputeNodes(clause.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().dropComputeNodes(clause); }); return null; } diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index b04ef7e593fd5..c33c7b089cec6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -2666,6 +2666,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static int lake_compaction_history_size = 20; + @ConfField(mutable = true) + public static String lake_compaction_warehouse = "default_warehouse"; + + @ConfField(mutable = true) + public static String lake_background_warehouse = "default_warehouse"; + // e.g. "tableId1;tableId2" @ConfField(mutable = true) public static String lake_compaction_disable_tables = ""; diff --git a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java index d7814430a3852..c35f418db9243 100644 --- a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java @@ -102,6 +102,7 @@ import com.starrocks.persist.DropPartitionInfo; import com.starrocks.persist.DropResourceOperationLog; import com.starrocks.persist.DropStorageVolumeLog; +import com.starrocks.persist.DropWarehouseLog; import com.starrocks.persist.GlobalVarPersistInfo; import com.starrocks.persist.HbPackage; import com.starrocks.persist.ImpersonatePrivInfo; @@ -161,6 +162,7 @@ import com.starrocks.system.Frontend; import com.starrocks.transaction.TransactionState; import com.starrocks.transaction.TransactionStateBatch; +import com.starrocks.warehouse.Warehouse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -954,6 +956,14 @@ public void readFields(DataInput in) throws IOException { data = new Text(Text.readBinary(in)); break; } + case OperationType.OP_CREATE_WAREHOUSE: + case OperationType.OP_ALTER_WAREHOUSE: + data = GsonUtils.GSON.fromJson(Text.readString(in), Warehouse.class); + break; + case OperationType.OP_DROP_WAREHOUSE: { + data = DropWarehouseLog.read(in); + break; + } default: { if (Config.metadata_ignore_unknown_operation_type) { LOG.warn("UNKNOWN Operation Type {}", opCode); diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java b/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java index 96e344da7cfd9..f3a062723d288 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java @@ -42,6 +42,7 @@ import com.staros.proto.StatusCode; import com.staros.proto.UpdateMetaGroupInfo; import com.staros.proto.WorkerGroupDetailInfo; +import com.staros.proto.WorkerGroupSpec; import com.staros.proto.WorkerInfo; import com.staros.util.LockCloseable; import com.starrocks.common.Config; @@ -56,6 +57,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -714,6 +716,53 @@ public List listWorkerGroupIpPort(long workerGroupId) throws UserExcepti } } + // remove previous worker with same backend id + private void tryRemovePreviousWorkerGroup(long workerGroupId) { + try (LockCloseable lock = new LockCloseable(rwLock.writeLock())) { + Iterator> iterator = workerToNode.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + long nodeId = entry.getValue(); + long workerId = entry.getKey(); + ComputeNode node = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendOrComputeNode(nodeId); + if (node.getWorkerGroupId() == workerGroupId) { + iterator.remove(); + workerToId.entrySet().removeIf(e -> e.getValue() == workerId); + } + } + } + } + + public long createWorkerGroup(String size) throws DdlException { + prepare(); + + // size should be x0, x1, x2, x4... + WorkerGroupSpec spec = WorkerGroupSpec.newBuilder().setSize(size).build(); + // owner means tenant, now there is only one tenant, so pass "Starrocks" to starMgr + String owner = "Starrocks"; + WorkerGroupDetailInfo result = null; + try { + result = client.createWorkerGroup(serviceId, owner, spec, Collections.emptyMap(), + Collections.emptyMap()); + } catch (StarClientException e) { + LOG.warn("Failed to create worker group. error: {}", e.getMessage()); + throw new DdlException("Failed to create worker group. error: " + e.getMessage()); + } + return result.getGroupId(); + } + + public void deleteWorkerGroup(long groupId) throws DdlException { + prepare(); + try { + client.deleteWorkerGroup(serviceId, groupId); + } catch (StarClientException e) { + LOG.warn("Failed to delete worker group {}. error: {}", groupId, e.getMessage()); + throw new DdlException("Failed to delete worker group. error: " + e.getMessage()); + } + + tryRemovePreviousWorkerGroup(groupId); + } + // dump all starmgr meta, for DEBUG purpose public String dump() { prepare(); diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/DropWarehouseLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/DropWarehouseLog.java new file mode 100644 index 0000000000000..55c487349d864 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/persist/DropWarehouseLog.java @@ -0,0 +1,47 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.persist; + +import com.google.gson.annotations.SerializedName; +import com.starrocks.common.io.Text; +import com.starrocks.common.io.Writable; +import com.starrocks.persist.gson.GsonUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class DropWarehouseLog implements Writable { + @SerializedName(value = "name") + private String warehouseName; + + public DropWarehouseLog(String warehouseName) { + this.warehouseName = warehouseName; + } + + public String getWarehouseName() { + return warehouseName; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static DropWarehouseLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, DropWarehouseLog.class); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java index b3eb60009b1b9..6f3a29e93616a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java @@ -95,6 +95,7 @@ import com.starrocks.scheduler.persist.TaskRunStatusChange; import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.LocalMetastore; +import com.starrocks.server.WarehouseManager; import com.starrocks.sql.ast.UserIdentity; import com.starrocks.staros.StarMgrJournal; import com.starrocks.staros.StarMgrServer; @@ -115,6 +116,7 @@ import com.starrocks.thrift.TNetworkAddress; import com.starrocks.transaction.TransactionState; import com.starrocks.transaction.TransactionStateBatch; +import com.starrocks.warehouse.Warehouse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -1197,6 +1199,24 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal) GlobalStateMgr.getCurrentState().getKeyMgr().replayAddKey(keyPB); break; } + case OperationType.OP_CREATE_WAREHOUSE: { + Warehouse wh = (Warehouse) journal.getData(); + WarehouseManager warehouseMgr = globalStateMgr.getWarehouseMgr(); + warehouseMgr.replayCreateWarehouse(wh); + break; + } + case OperationType.OP_DROP_WAREHOUSE: { + DropWarehouseLog log = (DropWarehouseLog) journal.getData(); + WarehouseManager warehouseMgr = globalStateMgr.getWarehouseMgr(); + warehouseMgr.replayDropWarehouse(log); + break; + } + case OperationType.OP_ALTER_WAREHOUSE: { + Warehouse wh = (Warehouse) journal.getData(); + WarehouseManager warehouseMgr = globalStateMgr.getWarehouseMgr(); + warehouseMgr.replayAlterWarehouse(wh); + break; + } default: { if (Config.metadata_ignore_unknown_operation_type) { LOG.warn("UNKNOWN Operation Type {}", opCode); @@ -1216,7 +1236,7 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal) /** * submit log to queue, wait for JournalWriter */ - protected void logEdit(short op, Writable writable) { + public void logEdit(short op, Writable writable) { JournalTask task = submitLog(op, writable, -1); waitInfinity(task); } @@ -1966,7 +1986,7 @@ public void logPipeOp(PipeOpEntry opEntry) { logEdit(OperationType.OP_PIPE, opEntry); } - private void logJsonObject(short op, Object obj) { + public void logJsonObject(short op, Object obj) { logEdit(op, out -> Text.writeString(out, GsonUtils.GSON.toJson(obj))); } diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java index fa047b30320a9..3bf01c69f1dbb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java @@ -928,10 +928,15 @@ public class OperationType { public static final short OP_ADD_KEY = 13512; /** - * NOTICE: OperationType cannot use a value exceeding 20000, and an error will be reported if it exceeds + * NOTICE: OperationType cannot use a value exceeding 20000, please follow the above sequence number */ public static final short OP_TYPE_EOF = 20000; + // warehouse + public static final short OP_CREATE_WAREHOUSE = 20101; + public static final short OP_DROP_WAREHOUSE = 20102; + public static final short OP_ALTER_WAREHOUSE = 20103; + public static final ImmutableSet IGNORABLE_OPERATIONS = buildIgnorableOperations(); private static ImmutableSet buildIgnorableOperations() { @@ -950,7 +955,8 @@ private static ImmutableSet buildIgnorableOperations() { System.exit(-1); } - if (opType > OperationType.OP_TYPE_EOF) { + if (opType > OperationType.OP_TYPE_EOF && + opType != OP_CREATE_WAREHOUSE && opType != OP_DROP_WAREHOUSE && opType != OP_ALTER_WAREHOUSE) { LOG.fatal("OperationType cannot use a value exceeding 20000, " + "and an error will be reported if it exceeds : {} = {}", field.getName(), opType); System.exit(-1); diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/com/starrocks/persist/gson/GsonUtils.java index feed6f3f78ef3..24316c9d395b0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/gson/GsonUtils.java @@ -167,7 +167,7 @@ import com.starrocks.privilege.TablePEntryObject; import com.starrocks.privilege.UserPEntryObject; import com.starrocks.privilege.ViewPEntryObject; -import com.starrocks.privilege.WarehouseFCPEntryObject; +import com.starrocks.privilege.WarehousePEntryObject; import com.starrocks.proto.EncryptionKeyPB; import com.starrocks.replication.ReplicationTxnCommitAttachment; import com.starrocks.server.SharedDataStorageVolumeMgr; @@ -338,7 +338,7 @@ public class GsonUtils { .registerSubtype(CatalogPEntryObject.class, "CatalogPEntryObject") .registerSubtype(ResourceGroupPEntryObject.class, "ResourceGroupPEntryObject") .registerSubtype(StorageVolumePEntryObject.class, "StorageVolumePEntryObject") - .registerSubtype(WarehouseFCPEntryObject.class, "WarehousePEntryObject") + .registerSubtype(WarehousePEntryObject.class, "WarehousePEntryObject") .registerSubtype(PipePEntryObject.class, "PipePEntryObject") .registerSubtype(PolicyFCEntryObject.class, "PolicyPEntryObject"); diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java b/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java index f4db1e9e235f9..f46283654c213 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java @@ -97,6 +97,11 @@ public int getId() { public static final SRMetaBlockID PIPE_MGR = new SRMetaBlockID(32); + /** + * NOTICE: SRMetaBlockID cannot use a value exceeding 20000, please follow the above sequence number + */ + public static final SRMetaBlockID WAREHOUSE_MGR = new SRMetaBlockID(20001); + @Override public String toString() { return String.valueOf(id); diff --git a/fe/fe-core/src/main/java/com/starrocks/privilege/AccessController.java b/fe/fe-core/src/main/java/com/starrocks/privilege/AccessController.java index 809b5f22f93f5..33d41df2c1a2e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/privilege/AccessController.java +++ b/fe/fe-core/src/main/java/com/starrocks/privilege/AccessController.java @@ -189,4 +189,14 @@ default Map getColumnMaskingPolicy(ConnectContext context, TableNa default Expr getRowAccessPolicy(ConnectContext currentUser, TableName tableName) { return null; } + + default void checkWarehouseAction(UserIdentity currentUser, Set roleIds, String name, PrivilegeType privilegeType) + throws AccessDeniedException { + throw new AccessDeniedException(); + } + + default void checkAnyActionOnWarehouse(UserIdentity currentUser, Set roleIds, String name) + throws AccessDeniedException { + throw new AccessDeniedException(); + } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/privilege/AuthorizationMgr.java b/fe/fe-core/src/main/java/com/starrocks/privilege/AuthorizationMgr.java index 879f773169304..5c37d0693199e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/privilege/AuthorizationMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/privilege/AuthorizationMgr.java @@ -148,7 +148,8 @@ public void initBuiltinRolesAndUsers() { ObjectType.RESOURCE_GROUP, ObjectType.PIPE, ObjectType.GLOBAL_FUNCTION, - ObjectType.STORAGE_VOLUME); + ObjectType.STORAGE_VOLUME, + ObjectType.WAREHOUSE); for (ObjectType objectType : objectTypes) { initPrivilegeCollectionAllObjects(rolePrivilegeCollection, objectType, provider.getAvailablePrivType(objectType)); @@ -195,9 +196,11 @@ public void initBuiltinRolesAndUsers() { initPrivilegeCollections( rolePrivilegeCollection, ObjectType.SYSTEM, - List.of(PrivilegeType.NODE), + List.of(PrivilegeType.NODE, PrivilegeType.CREATE_WAREHOUSE), null, false); + initPrivilegeCollectionAllObjects(rolePrivilegeCollection, ObjectType.WAREHOUSE, + provider.getAvailablePrivType(ObjectType.WAREHOUSE)); rolePrivilegeCollection.disableMutable(); // 4. user_admin @@ -270,7 +273,8 @@ protected void initPrivilegeCollectionAllObjects( } else if (ObjectType.RESOURCE.equals(objectType) || ObjectType.CATALOG.equals(objectType) || ObjectType.RESOURCE_GROUP.equals(objectType) - || ObjectType.STORAGE_VOLUME.equals(objectType)) { + || ObjectType.STORAGE_VOLUME.equals(objectType) + || ObjectType.WAREHOUSE.equals(objectType)) { objects.add(provider.generateObject(objectType, Lists.newArrayList("*"), globalStateMgr)); collection.grant(objectType, actionList, objects, false); diff --git a/fe/fe-core/src/main/java/com/starrocks/privilege/DefaultAuthorizationProvider.java b/fe/fe-core/src/main/java/com/starrocks/privilege/DefaultAuthorizationProvider.java index 27bcd28fa103f..ee259f6e893c5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/privilege/DefaultAuthorizationProvider.java +++ b/fe/fe-core/src/main/java/com/starrocks/privilege/DefaultAuthorizationProvider.java @@ -66,7 +66,8 @@ public DefaultAuthorizationProvider() { PrivilegeType.REPOSITORY, PrivilegeType.CREATE_RESOURCE_GROUP, PrivilegeType.CREATE_GLOBAL_FUNCTION, - PrivilegeType.CREATE_STORAGE_VOLUME)); + PrivilegeType.CREATE_STORAGE_VOLUME, + PrivilegeType.CREATE_WAREHOUSE)); typeToActionList.put(ObjectType.USER, Lists.newArrayList( PrivilegeType.IMPERSONATE)); @@ -114,6 +115,11 @@ public DefaultAuthorizationProvider() { PrivilegeType.DROP, PrivilegeType.ALTER, PrivilegeType.USAGE)); + + typeToActionList.put(ObjectType.WAREHOUSE, Lists.newArrayList( + PrivilegeType.USAGE, + PrivilegeType.ALTER, + PrivilegeType.DROP)); } public static final String UNEXPECTED_TYPE = "unexpected type "; @@ -185,6 +191,8 @@ public PEntryObject generateObject(ObjectType objectType, List objectTok return StorageVolumePEntryObject.generate(mgr, objectTokens); } else if (ObjectType.PIPE.equals(objectType)) { return PipePEntryObject.generate(mgr, objectTokens); + } else if (ObjectType.WAREHOUSE.equals(objectType)) { + return WarehousePEntryObject.generate(mgr, objectTokens); } throw new PrivilegeException(UNEXPECTED_TYPE + objectType.name()); } diff --git a/fe/fe-core/src/main/java/com/starrocks/privilege/NativeAccessController.java b/fe/fe-core/src/main/java/com/starrocks/privilege/NativeAccessController.java index f4059c5c01f57..1f39d602d97cf 100644 --- a/fe/fe-core/src/main/java/com/starrocks/privilege/NativeAccessController.java +++ b/fe/fe-core/src/main/java/com/starrocks/privilege/NativeAccessController.java @@ -393,4 +393,16 @@ private static void checkAnyActionOnFunctionObject(UserIdentity currentUser, Set throw new AccessDeniedException(); } } + + @Override + public void checkWarehouseAction(UserIdentity currentUser, Set roleIds, String name, PrivilegeType privilegeType) + throws AccessDeniedException { + checkObjectTypeAction(currentUser, roleIds, privilegeType, ObjectType.WAREHOUSE, + Collections.singletonList(name)); + } + + @Override + public void checkAnyActionOnWarehouse(UserIdentity currentUser, Set roleIds, String name) throws AccessDeniedException { + checkAnyActionOnObject(currentUser, roleIds, ObjectType.WAREHOUSE, Collections.singletonList(name)); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/privilege/ObjectType.java b/fe/fe-core/src/main/java/com/starrocks/privilege/ObjectType.java index 450ae8139738c..4779fecc06297 100644 --- a/fe/fe-core/src/main/java/com/starrocks/privilege/ObjectType.java +++ b/fe/fe-core/src/main/java/com/starrocks/privilege/ObjectType.java @@ -68,6 +68,11 @@ public String plural() { public static final ObjectType PIPE = new ObjectType(13); public static final ObjectType COLUMN = new ObjectType(14); + /** + * NOTICE: ObjectType cannot use a value exceeding 20000, please follow the above sequence number + */ + public static final ObjectType WAREHOUSE = new ObjectType(20003); + public static final Set VALID_OBJECT_TYPE = new ImmutableSet.Builder().add( TABLE, DATABASE, @@ -82,7 +87,8 @@ public String plural() { GLOBAL_FUNCTION, STORAGE_VOLUME, PIPE, - COLUMN + COLUMN, + WAREHOUSE ).build(); public static final Map> OBJECT_TO_NAME = @@ -101,6 +107,7 @@ public String plural() { .put(12, new Pair<>("STORAGE VOLUME", "STORAGE VOLUMES")) .put(13, new Pair<>("PIPE", "PIPES")) .put(14, new Pair<>("COLUMN", "COLUMNS")) + .put(20003, new Pair<>("WAREHOUSE", "WAREHOUSES")) .build(); public static final Map NAME_TO_OBJECT = VALID_OBJECT_TYPE.stream().collect(Collectors.toMap( diff --git a/fe/fe-core/src/main/java/com/starrocks/privilege/PrivilegeBuiltinConstants.java b/fe/fe-core/src/main/java/com/starrocks/privilege/PrivilegeBuiltinConstants.java index bddf4e704820e..e271c6aeec33f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/privilege/PrivilegeBuiltinConstants.java +++ b/fe/fe-core/src/main/java/com/starrocks/privilege/PrivilegeBuiltinConstants.java @@ -61,4 +61,5 @@ public class PrivilegeBuiltinConstants { public static final long ALL_PIPES_ID = -5; + public static final long ALL_WAREHOUSES_ID = -1; // -1 represent all warehouses } diff --git a/fe/fe-core/src/main/java/com/starrocks/privilege/PrivilegeType.java b/fe/fe-core/src/main/java/com/starrocks/privilege/PrivilegeType.java index e2787c011df76..fc58b855d12b5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/privilege/PrivilegeType.java +++ b/fe/fe-core/src/main/java/com/starrocks/privilege/PrivilegeType.java @@ -77,6 +77,11 @@ public String name() { public static final PrivilegeType CREATE_STORAGE_VOLUME = new PrivilegeType(27, "CREATE STORAGE VOLUME"); public static final PrivilegeType CREATE_PIPE = new PrivilegeType(28, "CREATE PIPE"); + /** + * NOTICE: PrivilegeType cannot use a value exceeding 20000, please follow the above sequence number + */ + public static final PrivilegeType CREATE_WAREHOUSE = new PrivilegeType(20004, "CREATE WAREHOUSE"); + public static final Set VALID_PRIVILEGE_TYPE = new ImmutableSet.Builder().add( GRANT, NODE, @@ -105,7 +110,8 @@ public String name() { CREATE_RESOURCE_GROUP, CREATE_EXTERNAL_CATALOG, CREATE_STORAGE_VOLUME, - CREATE_PIPE + CREATE_PIPE, + CREATE_WAREHOUSE ).build(); public static final Map NAME_TO_PRIVILEGE = VALID_PRIVILEGE_TYPE.stream().collect(Collectors.toMap( diff --git a/fe/fe-core/src/main/java/com/starrocks/privilege/WarehousePEntryObject.java b/fe/fe-core/src/main/java/com/starrocks/privilege/WarehousePEntryObject.java new file mode 100644 index 0000000000000..3abba4b9cf223 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/privilege/WarehousePEntryObject.java @@ -0,0 +1,128 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific + +package com.starrocks.privilege; + +import com.google.gson.annotations.SerializedName; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.WarehouseManager; +import com.starrocks.sql.common.MetaNotFoundException; +import com.starrocks.warehouse.Warehouse; + +import java.util.List; +import java.util.Objects; + +public class WarehousePEntryObject implements PEntryObject { + + @SerializedName(value = "i") + private long id; + + public long getId() { + return id; + } + + public static WarehousePEntryObject generate(GlobalStateMgr mgr, + List tokens) throws PrivilegeException { + if (tokens.size() != 1) { + throw new PrivilegeException("invalid object tokens, should have only one, token: " + tokens); + } + String name = tokens.get(0); + if (name.equals("*")) { + return new WarehousePEntryObject(PrivilegeBuiltinConstants.ALL_WAREHOUSES_ID); + } else { + WarehouseManager warehouseManagerEPack = mgr.getWarehouseMgr(); + Warehouse warehouse = warehouseManagerEPack.getWarehouseAllowNull(name); + if (warehouse == null) { + throw new PrivObjNotFoundException("cannot find warehouse: " + name); + } + return new WarehousePEntryObject(warehouse.getId()); + } + } + + protected WarehousePEntryObject(long id) { + this.id = id; + } + + /** + * if the current warehouse matches other warehouse, including fuzzy matching. + *

+ * this(warehouse1), other(warehouse1) -> true

+ * this(warehouse1), other(ALL) -> true

+ * this(ALL), other(warehouse1) -> false + */ + @Override + public boolean match(Object obj) { + if (!(obj instanceof WarehousePEntryObject)) { + return false; + } + WarehousePEntryObject other = (WarehousePEntryObject) obj; + if (other.id == PrivilegeBuiltinConstants.ALL_WAREHOUSES_ID) { + return true; + } + return other.id == id; + } + + @Override + public boolean isFuzzyMatching() { + return PrivilegeBuiltinConstants.ALL_WAREHOUSES_ID == id; + } + + @Override + public boolean validate(GlobalStateMgr globalStateMgr) { + return globalStateMgr.getWarehouseMgr().getWarehouse(id) != null; + } + + @Override + public int compareTo(PEntryObject obj) { + if (!(obj instanceof WarehousePEntryObject)) { + throw new ClassCastException("cannot cast " + obj.getClass().toString() + " to " + this.getClass()); + } + WarehousePEntryObject o = (WarehousePEntryObject) obj; + return Long.compare(this.id, o.id); + } + + @Override + public PEntryObject clone() { + return new WarehousePEntryObject(id); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WarehousePEntryObject that = (WarehousePEntryObject) o; + return id == that.id; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public String toString() { + if (getId() == PrivilegeBuiltinConstants.ALL_WAREHOUSES_ID) { + return "ALL WAREHOUSES"; + } else { + Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(getId()); + if (warehouse == null) { + throw new MetaNotFoundException("Can't find warehouse : " + id); + } + return warehouse.getName(); + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/privilege/ranger/starrocks/RangerStarRocksAccessController.java b/fe/fe-core/src/main/java/com/starrocks/privilege/ranger/starrocks/RangerStarRocksAccessController.java index 6bd4367db6691..b22a00eb8cd2b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/privilege/ranger/starrocks/RangerStarRocksAccessController.java +++ b/fe/fe-core/src/main/java/com/starrocks/privilege/ranger/starrocks/RangerStarRocksAccessController.java @@ -451,4 +451,15 @@ public Expr getRowAccessPolicy(ConnectContext context, TableName tableName) { public String convertToAccessType(PrivilegeType privilegeType) { return privilegeType.name().toLowerCase(ENGLISH); } + + @Override + public void checkWarehouseAction(UserIdentity currentUser, Set roleIds, String name, PrivilegeType privilegeType) + throws AccessDeniedException { + throw new AccessDeniedException(); + } + + @Override + public void checkAnyActionOnWarehouse(UserIdentity currentUser, Set roleIds, String name) throws AccessDeniedException { + throw new AccessDeniedException(); + } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java index c9fa3a224c32f..d8923fc50d1ad 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java @@ -36,6 +36,7 @@ import com.starrocks.scheduler.Task; import com.starrocks.scheduler.TaskManager; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.WarehouseManager; import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.ast.AdminCancelRepairTableStmt; import com.starrocks.sql.ast.AdminCheckTabletsStmt; @@ -136,6 +137,10 @@ import com.starrocks.sql.ast.pipe.AlterPipeStmt; import com.starrocks.sql.ast.pipe.CreatePipeStmt; import com.starrocks.sql.ast.pipe.DropPipeStmt; +import com.starrocks.sql.ast.warehouse.CreateWarehouseStmt; +import com.starrocks.sql.ast.warehouse.DropWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ResumeWarehouseStmt; +import com.starrocks.sql.ast.warehouse.SuspendWarehouseStmt; import com.starrocks.statistic.AnalyzeJob; import com.starrocks.statistic.ExternalAnalyzeJob; import com.starrocks.statistic.NativeAnalyzeJob; @@ -1126,6 +1131,46 @@ public ShowResultSet visitCancelRefreshDictionaryStatement(CancelRefreshDictiona }); return null; } + + //=========================================== Warehouse Statement ================================================== + + @Override + public ShowResultSet visitCreateWarehouseStatement(CreateWarehouseStmt stmt, ConnectContext context) { + ErrorReport.wrapWithRuntimeException(() -> { + WarehouseManager warehouseMgr = context.getGlobalStateMgr().getWarehouseMgr(); + warehouseMgr.createWarehouse(stmt); + + }); + return null; + } + + @Override + public ShowResultSet visitSuspendWarehouseStatement(SuspendWarehouseStmt stmt, ConnectContext context) { + ErrorReport.wrapWithRuntimeException(() -> { + WarehouseManager warehouseMgr = context.getGlobalStateMgr().getWarehouseMgr(); + warehouseMgr.suspendWarehouse(stmt); + + }); + return null; + } + + @Override + public ShowResultSet visitResumeWarehouseStatement(ResumeWarehouseStmt stmt, ConnectContext context) { + ErrorReport.wrapWithRuntimeException(() -> { + WarehouseManager warehouseMgr = context.getGlobalStateMgr().getWarehouseMgr(); + warehouseMgr.resumeWarehouse(stmt); + }); + return null; + } + + @Override + public ShowResultSet visitDropWarehouseStatement(DropWarehouseStmt stmt, ConnectContext context) { + ErrorReport.wrapWithRuntimeException(() -> { + WarehouseManager warehouseMgr = context.getGlobalStateMgr().getWarehouseMgr(); + warehouseMgr.dropWarehouse(stmt); + }); + return null; + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java index 9e885ebf45c77..bdbc993cd263f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java @@ -83,6 +83,7 @@ import com.starrocks.common.DdlException; import com.starrocks.common.ErrorCode; import com.starrocks.common.ErrorReport; +import com.starrocks.common.ErrorReportException; import com.starrocks.common.MetaNotFoundException; import com.starrocks.common.Pair; import com.starrocks.common.PatternMatcher; @@ -146,6 +147,7 @@ import com.starrocks.server.RunMode; import com.starrocks.server.StorageVolumeMgr; import com.starrocks.server.TemporaryTableMgr; +import com.starrocks.server.WarehouseManager; import com.starrocks.service.InformationSchemaDataSource; import com.starrocks.sql.ShowTemporaryTableStmt; import com.starrocks.sql.analyzer.AstToSQLBuilder; @@ -229,6 +231,8 @@ import com.starrocks.sql.ast.pipe.DescPipeStmt; import com.starrocks.sql.ast.pipe.PipeName; import com.starrocks.sql.ast.pipe.ShowPipeStmt; +import com.starrocks.sql.ast.warehouse.ShowNodesStmt; +import com.starrocks.sql.ast.warehouse.ShowWarehousesStmt; import com.starrocks.sql.common.MetaUtils; import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils; import com.starrocks.statistic.AnalyzeJob; @@ -242,6 +246,7 @@ import com.starrocks.thrift.TStatusCode; import com.starrocks.thrift.TTableInfo; import com.starrocks.transaction.GlobalTransactionMgr; +import com.starrocks.warehouse.Warehouse; import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -2680,6 +2685,78 @@ public ShowResultSet visitShowBackendBlackListStatement(ShowBackendBlackListStmt return new ShowResultSet(statement.getMetaData(), rows); } + @Override + public ShowResultSet visitShowWarehousesStatement(ShowWarehousesStmt statement, ConnectContext context) { + GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState(); + WarehouseManager warehouseMgr = globalStateMgr.getWarehouseMgr(); + + if (RunMode.getCurrentRunMode() == RunMode.SHARED_NOTHING) { + throw ErrorReportException.report(ErrorCode.ERR_NOT_SUPPORTED_STATEMENT_IN_SHARED_NOTHING_MODE); + } + + PatternMatcher matcher = null; + if (!statement.getPattern().isEmpty()) { + matcher = PatternMatcher.createMysqlPattern(statement.getPattern(), + CaseSensibility.WAREHOUSE.getCaseSensibility()); + } + PatternMatcher finalMatcher = matcher; + + List> rowSet = warehouseMgr.getAllWarehouses().stream() + .filter(warehouse -> finalMatcher == null || finalMatcher.match(warehouse.getName())) + .filter(warehouse -> { + try { + Authorizer.checkAnyActionOnWarehouse(context.getCurrentUserIdentity(), + context.getCurrentRoleIds(), warehouse.getName()); + } catch (AccessDeniedException e) { + return false; + } + return true; + }).sorted(Comparator.comparing(Warehouse::getId)).map(Warehouse::getWarehouseInfo) + .collect(Collectors.toList()); + return new ShowResultSet(statement.getMetaData(), rowSet); + } + + @Override + public ShowResultSet visitShowNodesStatement(ShowNodesStmt statement, ConnectContext context) { + List> rows = Lists.newArrayList(); + WarehouseManager warehouseMgr = GlobalStateMgr.getCurrentState().getWarehouseMgr(); + + // filter by pattern or warehouseName + String warehouseName = null; + PatternMatcher matcher = null; + if (statement.getWarehouseName() != null) { + warehouseName = statement.getWarehouseName(); + } else if (statement.getPattern() != null) { + matcher = PatternMatcher.createMysqlPattern(statement.getPattern(), + CaseSensibility.WAREHOUSE.getCaseSensibility()); + } + + List warehouseList = warehouseMgr.getAllWarehouses().stream().filter( + warehouse -> { + try { + Authorizer.checkAnyActionOnWarehouse(context.getCurrentUserIdentity(), + context.getCurrentRoleIds(), warehouse.getName()); + } catch (AccessDeniedException e) { + return false; + } + return true; + } + ).collect(Collectors.toList()); + + for (Warehouse wh : warehouseList) { + if (warehouseName != null && !wh.getName().equalsIgnoreCase(warehouseName)) { + continue; + } + + if (matcher != null && !matcher.match(wh.getName())) { + continue; + } + + rows.addAll(wh.getWarehouseNodesInfo()); + } + return new ShowResultSet(statement.getMetaData(), rows); + } + private List> doPredicate(ShowStmt showStmt, ShowResultSetMetaData showResultSetMetaData, List> rows) { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index daf0503413028..9b393a3bd38d8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -119,6 +119,8 @@ import com.starrocks.qe.scheduler.Coordinator; import com.starrocks.qe.scheduler.FeExecuteCoordinator; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.RunMode; +import com.starrocks.server.WarehouseManager; import com.starrocks.sql.ExplainAnalyzer; import com.starrocks.sql.PrepareStmtPlanner; import com.starrocks.sql.StatementPlanner; @@ -170,6 +172,7 @@ import com.starrocks.sql.ast.UseCatalogStmt; import com.starrocks.sql.ast.UseDbStmt; import com.starrocks.sql.ast.UserVariable; +import com.starrocks.sql.ast.warehouse.SetWarehouseStmt; import com.starrocks.sql.common.AuditEncryptionChecker; import com.starrocks.sql.common.DmlException; import com.starrocks.sql.common.ErrorType; @@ -675,6 +678,8 @@ public void execute() throws Exception { handleSetStmt(); } else if (parsedStmt instanceof UseDbStmt) { handleUseDbStmt(); + } else if (parsedStmt instanceof SetWarehouseStmt) { + handleSetWarehouseStmt(); } else if (parsedStmt instanceof UseCatalogStmt) { handleUseCatalogStmt(); } else if (parsedStmt instanceof SetCatalogStmt) { @@ -1558,6 +1563,27 @@ private void handleSetCatalogStmt() throws AnalysisException { context.getState().setOk(); } + // Process use warehouse statement + private void handleSetWarehouseStmt() throws AnalysisException { + if (RunMode.getCurrentRunMode() == RunMode.SHARED_NOTHING) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_SUPPORTED_STATEMENT_IN_SHARED_NOTHING_MODE); + } + + SetWarehouseStmt setWarehouseStmt = (SetWarehouseStmt) parsedStmt; + try { + WarehouseManager warehouseMgr = GlobalStateMgr.getCurrentState().getWarehouseMgr(); + String newWarehouseName = setWarehouseStmt.getWarehouseName(); + if (!warehouseMgr.warehouseExists(newWarehouseName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_WAREHOUSE_ERROR, newWarehouseName); + } + context.setCurrentWarehouse(newWarehouseName); + } catch (Exception e) { + context.getState().setError(e.getMessage()); + return; + } + context.getState().setOk(); + } + private void sendMetaData(ShowResultSetMetaData metaData) throws IOException { // sends how many columns serializer.reset(); diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index 8ff4d1cf522d7..b8bd2afce5a1a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -1533,6 +1533,7 @@ public void loadImage(String imageDir) throws IOException { .put(SRMetaBlockID.REPLICATION_MGR, replicationMgr::load) .put(SRMetaBlockID.KEY_MGR, keyMgr::load) .put(SRMetaBlockID.PIPE_MGR, pipeManager.getRepo()::load) + .put(SRMetaBlockID.WAREHOUSE_MGR, warehouseMgr::load) .build(); Set metaMgrMustExists = new HashSet<>(loadImages.keySet()); @@ -1732,6 +1733,7 @@ public void saveImage(ImageWriter imageWriter, File curFile) throws IOException replicationMgr.save(imageWriter); keyMgr.save(imageWriter); pipeManager.getRepo().save(imageWriter); + warehouseMgr.save(imageWriter); } catch (SRMetaBlockException e) { LOG.error("Save meta block failed ", e); throw new IOException("Save meta block failed ", e); diff --git a/fe/fe-core/src/main/java/com/starrocks/server/WarehouseManager.java b/fe/fe-core/src/main/java/com/starrocks/server/WarehouseManager.java index 44cc327e7c330..5279357d409b8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/WarehouseManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/WarehouseManager.java @@ -18,6 +18,7 @@ import com.staros.client.StarClientException; import com.staros.proto.ShardInfo; import com.staros.util.LockCloseable; +import com.starrocks.common.DdlException; import com.starrocks.common.ErrorCode; import com.starrocks.common.ErrorReportException; import com.starrocks.common.UserException; @@ -25,7 +26,16 @@ import com.starrocks.common.io.Writable; import com.starrocks.lake.LakeTablet; import com.starrocks.lake.StarOSAgent; +import com.starrocks.persist.DropWarehouseLog; +import com.starrocks.persist.ImageWriter; import com.starrocks.persist.gson.GsonUtils; +import com.starrocks.persist.metablock.SRMetaBlockEOFException; +import com.starrocks.persist.metablock.SRMetaBlockException; +import com.starrocks.persist.metablock.SRMetaBlockReader; +import com.starrocks.sql.ast.warehouse.CreateWarehouseStmt; +import com.starrocks.sql.ast.warehouse.DropWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ResumeWarehouseStmt; +import com.starrocks.sql.ast.warehouse.SuspendWarehouseStmt; import com.starrocks.system.ComputeNode; import com.starrocks.system.SystemInfoService; import com.starrocks.warehouse.DefaultWarehouse; @@ -260,7 +270,7 @@ public Warehouse getBackgroundWarehouse() { public Optional selectWorkerGroupByWarehouseId(long warehouseId) { Optional workerGroupId = selectWorkerGroupInternal(warehouseId); - if (workerGroupId.isEmpty()) { + if (workerGroupId.isEmpty()) { return workerGroupId; } @@ -276,7 +286,7 @@ public Optional selectWorkerGroupByWarehouseId(long warehouseId) { private Optional selectWorkerGroupInternal(long warehouseId) { Warehouse warehouse = getWarehouse(warehouseId); - if (warehouse == null) { + if (warehouse == null) { LOG.warn("failed to get warehouse by id {}", warehouseId); return Optional.empty(); } @@ -289,4 +299,39 @@ private Optional selectWorkerGroupInternal(long warehouseId) { return Optional.of(ids.get(0)); } + + public void createWarehouse(CreateWarehouseStmt stmt) throws DdlException { + throw new DdlException("Multi-Warehouse is not implemented"); + } + + public void dropWarehouse(DropWarehouseStmt stmt) throws DdlException { + throw new DdlException("Multi-Warehouse is not implemented"); + } + + public void suspendWarehouse(SuspendWarehouseStmt stmt) throws DdlException { + throw new DdlException("Multi-Warehouse is not implemented"); + } + + public void resumeWarehouse(ResumeWarehouseStmt stmt) throws DdlException { + throw new DdlException("Multi-Warehouse is not implemented"); + } + + public void replayCreateWarehouse(Warehouse warehouse) { + + } + + public void replayDropWarehouse(DropWarehouseLog log) { + + } + + public void replayAlterWarehouse(Warehouse warehouse) { + + } + + public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException { + } + + public void load(SRMetaBlockReader reader) + throws SRMetaBlockEOFException, IOException, SRMetaBlockException { + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java index e337ae4db388e..e2bd3b5a7ba01 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java @@ -152,6 +152,14 @@ import com.starrocks.sql.ast.pipe.DescPipeStmt; import com.starrocks.sql.ast.pipe.DropPipeStmt; import com.starrocks.sql.ast.pipe.ShowPipeStmt; +import com.starrocks.sql.ast.warehouse.CreateWarehouseStmt; +import com.starrocks.sql.ast.warehouse.DropWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ResumeWarehouseStmt; +import com.starrocks.sql.ast.warehouse.SetWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ShowClustersStmt; +import com.starrocks.sql.ast.warehouse.ShowNodesStmt; +import com.starrocks.sql.ast.warehouse.ShowWarehousesStmt; +import com.starrocks.sql.ast.warehouse.SuspendWarehouseStmt; public class Analyzer { private final AnalyzerVisitor analyzerVisitor; @@ -1020,5 +1028,52 @@ public Void visitShowDictionaryStatement(ShowDictionaryStmt statement, ConnectCo DictionaryAnalyzer.analyze(statement, context); return null; } + + // ---------------------------------------- Warehouse Statement --------------------------------------------------- + + @Override + public Void visitShowWarehousesStatement(ShowWarehousesStmt statement, ConnectContext context) { + return null; + } + + @Override + public Void visitShowClusterStatement(ShowClustersStmt statement, ConnectContext context) { + return null; + } + + @Override + public Void visitCreateWarehouseStatement(CreateWarehouseStmt statement, ConnectContext context) { + WarehouseAnalyzer.analyze(statement, context); + return null; + } + + @Override + public Void visitDropWarehouseStatement(DropWarehouseStmt statement, ConnectContext context) { + WarehouseAnalyzer.analyze(statement, context); + return null; + } + + @Override + public Void visitSuspendWarehouseStatement(SuspendWarehouseStmt statement, ConnectContext context) { + WarehouseAnalyzer.analyze(statement, context); + return null; + } + + @Override + public Void visitResumeWarehouseStatement(ResumeWarehouseStmt statement, ConnectContext context) { + WarehouseAnalyzer.analyze(statement, context); + return null; + } + + @Override + public Void visitSetWarehouseStatement(SetWarehouseStmt statement, ConnectContext context) { + WarehouseAnalyzer.analyze(statement, context); + return null; + } + + @Override + public Void visitShowNodesStatement(ShowNodesStmt statement, ConnectContext context) { + return null; + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AstToStringBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AstToStringBuilder.java index 20518fdf1121e..1b84d08aaa827 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AstToStringBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AstToStringBuilder.java @@ -265,8 +265,8 @@ public String visitGrantRevokePrivilegeStatement(BaseGrantRevokePrivilegeStmt st sb.append(stmt.getObjectType().name()).append(" "); List objectString = new ArrayList<>(); - for (PEntryObject tablePEntryObject : stmt.getObjectList()) { - objectString.add(tablePEntryObject.toString()); + for (PEntryObject pEntryObject : stmt.getObjectList()) { + objectString.add(pEntryObject.toString()); } sb.append(Joiner.on(", ").join(objectString)); } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Authorizer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Authorizer.java index 2f55544e1d5e6..b3f95d2901614 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Authorizer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Authorizer.java @@ -34,9 +34,11 @@ import com.starrocks.qe.ConnectContext; import com.starrocks.server.CatalogMgr; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.WarehouseManager; import com.starrocks.sql.ast.StatementBase; import com.starrocks.sql.ast.UserIdentity; import com.starrocks.sql.ast.pipe.PipeName; +import com.starrocks.warehouse.Warehouse; import org.apache.commons.collections4.ListUtils; import java.util.List; @@ -414,4 +416,20 @@ public static Pair checkPrivForShowTablet(ConnectContext conte } } } + + public static void checkWarehouseAction(UserIdentity currentUser, Set roleIds, String name, + PrivilegeType privilegeType) throws AccessDeniedException { + getInstance().getAccessControlOrDefault(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME) + .checkWarehouseAction(currentUser, roleIds, name, privilegeType); + } + + public static void checkAnyActionOnWarehouse(UserIdentity currentUser, Set roleIds, String name) + throws AccessDeniedException { + // Any user has an implicit usage permission on the default_warehouse + Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(name); + if (warehouse.getId() != WarehouseManager.DEFAULT_WAREHOUSE_ID) { + getInstance().getAccessControlOrDefault(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME) + .checkAnyActionOnWarehouse(currentUser, roleIds, name); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AuthorizerStmtVisitor.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AuthorizerStmtVisitor.java index 0bdd27055b85e..e83ceeb68940f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AuthorizerStmtVisitor.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AuthorizerStmtVisitor.java @@ -16,6 +16,8 @@ import com.google.common.collect.Lists; import com.starrocks.analysis.FunctionName; +import com.starrocks.analysis.HintNode; +import com.starrocks.analysis.SetVarHint; import com.starrocks.analysis.TableName; import com.starrocks.analysis.TableRef; import com.starrocks.backup.AbstractJob; @@ -45,8 +47,10 @@ import com.starrocks.privilege.PrivilegeException; import com.starrocks.privilege.PrivilegeType; import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.SessionVariable; import com.starrocks.server.CatalogMgr; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.WarehouseManager; import com.starrocks.sql.ast.AddBackendBlackListStmt; import com.starrocks.sql.ast.AddSqlBlackListStmt; import com.starrocks.sql.ast.AdminCancelRepairTableStmt; @@ -140,6 +144,7 @@ import com.starrocks.sql.ast.RestoreStmt; import com.starrocks.sql.ast.ResumeRoutineLoadStmt; import com.starrocks.sql.ast.RevokeRoleStmt; +import com.starrocks.sql.ast.SelectRelation; import com.starrocks.sql.ast.SetCatalogStmt; import com.starrocks.sql.ast.SetDefaultRoleStmt; import com.starrocks.sql.ast.SetDefaultStorageVolumeStmt; @@ -206,7 +211,15 @@ import com.starrocks.sql.ast.pipe.DropPipeStmt; import com.starrocks.sql.ast.pipe.PipeName; import com.starrocks.sql.ast.pipe.ShowPipeStmt; +import com.starrocks.sql.ast.warehouse.CreateWarehouseStmt; +import com.starrocks.sql.ast.warehouse.DropWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ResumeWarehouseStmt; +import com.starrocks.sql.ast.warehouse.SetWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ShowClustersStmt; +import com.starrocks.sql.ast.warehouse.ShowWarehousesStmt; +import com.starrocks.sql.ast.warehouse.SuspendWarehouseStmt; import com.starrocks.sql.common.MetaUtils; +import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -235,6 +248,28 @@ public void check(StatementBase statement, ConnectContext context) { @Override public Void visitQueryStatement(QueryStatement statement, ConnectContext context) { checkSelectTableAction(context, statement, Lists.newArrayList()); + + List hintNodes = null; + if (statement.getQueryRelation() instanceof SelectRelation) { + SelectRelation selectRelation = (SelectRelation) statement.getQueryRelation(); + hintNodes = selectRelation.getSelectList().getHintNodes(); + } + + if (CollectionUtils.isNotEmpty(hintNodes)) { + for (HintNode hintNode : hintNodes) { + if (hintNode instanceof SetVarHint) { + Map optHints = hintNode.getValue(); + if (optHints.containsKey(SessionVariable.WAREHOUSE_NAME)) { + // check warehouse privilege + String warehouseName = optHints.get(SessionVariable.WAREHOUSE_NAME); + if (!warehouseName.equalsIgnoreCase(WarehouseManager.DEFAULT_WAREHOUSE_NAME)) { + checkWarehouseUsagePrivilege(warehouseName, context); + } + } + } + } + } + return null; } @@ -302,6 +337,13 @@ public Void visitCreateRoutineLoadStatement(CreateRoutineLoadStmt statement, Con context.getCurrentUserIdentity(), context.getCurrentRoleIds(), PrivilegeType.INSERT.name(), ObjectType.TABLE.name(), statement.getTableName()); } + + // check warehouse privilege + Map properties = statement.getJobProperties(); + if (properties != null && properties.containsKey(PropertyAnalyzer.PROPERTIES_WAREHOUSE)) { + String warehouseName = properties.get(PropertyAnalyzer.PROPERTIES_WAREHOUSE); + checkWarehouseUsagePrivilege(warehouseName, context); + } return null; } @@ -317,6 +359,13 @@ public Void visitAlterRoutineLoadStatement(AlterRoutineLoadStmt statement, Conne context.getCurrentUserIdentity(), context.getCurrentRoleIds(), PrivilegeType.INSERT.name(), ObjectType.TABLE.name(), tableName); } + + // check warehouse privilege + Map properties = statement.getJobProperties(); + if (properties != null && properties.containsKey(PropertyAnalyzer.PROPERTIES_WAREHOUSE)) { + String warehouseName = properties.get(PropertyAnalyzer.PROPERTIES_WAREHOUSE); + checkWarehouseUsagePrivilege(warehouseName, context); + } return null; } @@ -422,6 +471,13 @@ public Void visitLoadStatement(LoadStmt statement, ConnectContext context) { PrivilegeType.INSERT.name(), ObjectType.TABLE.name(), tableName); } }); + + // check warehouse privilege + Map properties = statement.getProperties(); + if (properties != null && properties.containsKey(PropertyAnalyzer.PROPERTIES_WAREHOUSE)) { + String warehouseName = properties.get(PropertyAnalyzer.PROPERTIES_WAREHOUSE); + checkWarehouseUsagePrivilege(warehouseName, context); + } return null; } @@ -2216,6 +2272,14 @@ public Void visitCreateMaterializedViewStatement(CreateMaterializedViewStatement InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, statement.getTableName().getDb(), PrivilegeType.CREATE_MATERIALIZED_VIEW); visitQueryStatement(statement.getQueryStatement(), context); + + // check warehouse privilege + Map properties = statement.getProperties(); + if (properties != null && properties.containsKey(PropertyAnalyzer.PROPERTIES_WAREHOUSE)) { + String warehouseName = properties.get(PropertyAnalyzer.PROPERTIES_WAREHOUSE); + checkWarehouseUsagePrivilege(warehouseName, context); + } + } catch (AccessDeniedException e) { AccessDeniedException.reportAccessDenied( InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, @@ -2515,6 +2579,88 @@ public Void visitCancelCompactionStatement(CancelCompactionStmt statement, Conne return null; } + // --------------------------------- Warehouse Statement --------------------------------- + @Override + public Void visitCreateWarehouseStatement(CreateWarehouseStmt statement, ConnectContext context) { + try { + Authorizer.checkSystemAction(context.getCurrentUserIdentity(), context.getCurrentRoleIds(), + PrivilegeType.CREATE_WAREHOUSE); + } catch (AccessDeniedException e) { + AccessDeniedException.reportAccessDenied(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, + context.getCurrentUserIdentity(), context.getCurrentRoleIds(), + PrivilegeType.CREATE_WAREHOUSE.name(), ObjectType.SYSTEM.name(), null); + } + return null; + } + + @Override + public Void visitSuspendWarehouseStatement(SuspendWarehouseStmt statement, ConnectContext context) { + try { + Authorizer.checkWarehouseAction(context.getCurrentUserIdentity(), + context.getCurrentRoleIds(), statement.getWarehouseName(), PrivilegeType.ALTER); + } catch (AccessDeniedException e) { + AccessDeniedException.reportAccessDenied(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, + context.getCurrentUserIdentity(), context.getCurrentRoleIds(), + PrivilegeType.ALTER.name(), ObjectType.WAREHOUSE.name(), statement.getWarehouseName()); + } + return null; + } + + @Override + public Void visitResumeWarehouseStatement(ResumeWarehouseStmt statement, ConnectContext context) { + try { + Authorizer.checkWarehouseAction(context.getCurrentUserIdentity(), + context.getCurrentRoleIds(), statement.getWarehouseName(), PrivilegeType.ALTER); + } catch (AccessDeniedException e) { + AccessDeniedException.reportAccessDenied(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, + context.getCurrentUserIdentity(), context.getCurrentRoleIds(), + PrivilegeType.ALTER.name(), ObjectType.WAREHOUSE.name(), statement.getWarehouseName()); + } + return null; + } + + public Void visitDropWarehouseStatement(DropWarehouseStmt statement, ConnectContext context) { + try { + Authorizer.checkWarehouseAction(context.getCurrentUserIdentity(), + context.getCurrentRoleIds(), statement.getWarehouseName(), PrivilegeType.DROP); + } catch (AccessDeniedException e) { + AccessDeniedException.reportAccessDenied(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, + context.getCurrentUserIdentity(), context.getCurrentRoleIds(), + PrivilegeType.DROP.name(), ObjectType.WAREHOUSE.name(), statement.getWarehouseName()); + } + return null; + } + + public Void visitSetWarehouseStatement(SetWarehouseStmt statement, ConnectContext context) { + try { + Authorizer.checkWarehouseAction(context.getCurrentUserIdentity(), + context.getCurrentRoleIds(), statement.getWarehouseName(), PrivilegeType.USAGE); + } catch (AccessDeniedException e) { + AccessDeniedException.reportAccessDenied(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, + context.getCurrentUserIdentity(), context.getCurrentRoleIds(), + PrivilegeType.USAGE.name(), ObjectType.WAREHOUSE.name(), statement.getWarehouseName()); + } + return null; + } + + public Void visitShowWarehousesStatement(ShowWarehousesStmt statement, ConnectContext context) { + // `show warehouses` only show warehouses that user has any privilege on, we will check it in + // the execution logic, not here, see `handleShowWarehouses()` for details. + return null; + } + + public Void visitShowClusterStatement(ShowClustersStmt statement, ConnectContext context) { + try { + Authorizer.checkAnyActionOnWarehouse(context.getCurrentUserIdentity(), + context.getCurrentRoleIds(), statement.getWarehouseName()); + } catch (AccessDeniedException e) { + AccessDeniedException.reportAccessDenied(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, + context.getCurrentUserIdentity(), context.getCurrentRoleIds(), + PrivilegeType.ANY.name(), ObjectType.WAREHOUSE.name(), statement.getWarehouseName()); + } + return null; + } + private String getTableNameByRoutineLoadLabel(ConnectContext context, String dbName, String labelName) { RoutineLoadJob job = null; @@ -2572,4 +2718,15 @@ private void checkOperateLoadPrivilege(ConnectContext context, String dbName, St } }); } + + private void checkWarehouseUsagePrivilege(String warehouseName, ConnectContext context) { + try { + Authorizer.checkWarehouseAction(context.getCurrentUserIdentity(), + context.getCurrentRoleIds(), warehouseName, PrivilegeType.USAGE); + } catch (AccessDeniedException e) { + AccessDeniedException.reportAccessDenied(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME, + context.getCurrentUserIdentity(), context.getCurrentRoleIds(), + PrivilegeType.USAGE.name(), ObjectType.WAREHOUSE.name(), warehouseName); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/PrivilegeStmtAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/PrivilegeStmtAnalyzer.java index d65c73d1e12fa..e9d98e436cf20 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/PrivilegeStmtAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/PrivilegeStmtAnalyzer.java @@ -449,7 +449,8 @@ public List> analyzeTokens(BaseGrantRevokePrivilegeStmt stmt, Objec } else if (ObjectType.RESOURCE.equals(objectType) || ObjectType.CATALOG.equals(objectType) || ObjectType.RESOURCE_GROUP.equals(objectType) - || ObjectType.STORAGE_VOLUME.equals(objectType)) { + || ObjectType.STORAGE_VOLUME.equals(objectType) + || ObjectType.WAREHOUSE.equals(objectType)) { if (tokens.size() != 1) { throw new SemanticException( "Invalid grant statement with error privilege object " + tokens); @@ -528,7 +529,8 @@ public List> analyzeTokens(BaseGrantRevokePrivilegeStmt stmt, Objec } else if (ObjectType.RESOURCE.equals(objectType) || ObjectType.CATALOG.equals(objectType) || ObjectType.RESOURCE_GROUP.equals(objectType) - || ObjectType.STORAGE_VOLUME.equals(objectType)) { + || ObjectType.STORAGE_VOLUME.equals(objectType) + || ObjectType.WAREHOUSE.equals(objectType)) { for (List tokens : stmt.getPrivilegeObjectNameTokensList()) { if (tokens.size() != 1) { throw new SemanticException( diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/WarehouseAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/WarehouseAnalyzer.java new file mode 100644 index 0000000000000..238d6163cbe38 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/WarehouseAnalyzer.java @@ -0,0 +1,99 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.analyzer; + +import com.google.common.base.Strings; +import com.starrocks.common.ErrorCode; +import com.starrocks.common.ErrorReport; +import com.starrocks.qe.ConnectContext; +import com.starrocks.server.WarehouseManager; +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.ShowStmt; +import com.starrocks.sql.ast.StatementBase; +import com.starrocks.sql.ast.warehouse.CreateWarehouseStmt; +import com.starrocks.sql.ast.warehouse.DropWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ResumeWarehouseStmt; +import com.starrocks.sql.ast.warehouse.SetWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ShowWarehousesStmt; +import com.starrocks.sql.ast.warehouse.SuspendWarehouseStmt; + +public class WarehouseAnalyzer { + public static void analyze(StatementBase stmt, ConnectContext session) { + new WarehouseAnalyzerVisitor().visit(stmt, session); + } + + static class WarehouseAnalyzerVisitor implements AstVisitor { + public void analyze(ShowStmt statement, ConnectContext session) { + visit(statement, session); + } + + @Override + public Void visitCreateWarehouseStatement(CreateWarehouseStmt statement, ConnectContext context) { + String whName = statement.getWarehouseName(); + if (Strings.isNullOrEmpty(whName)) { + ErrorReport.reportSemanticException(ErrorCode.ERR_INVALID_WAREHOUSE_NAME); + } + FeNameFormat.checkWarehouseName(whName); + return null; + } + + @Override + public Void visitSuspendWarehouseStatement(SuspendWarehouseStmt statement, ConnectContext context) { + String whName = statement.getWarehouseName(); + if (Strings.isNullOrEmpty(whName)) { + ErrorReport.reportSemanticException(ErrorCode.ERR_INVALID_WAREHOUSE_NAME); + } + return null; + } + + public Void visitResumeWarehouseStatement(ResumeWarehouseStmt statement, ConnectContext context) { + String whName = statement.getWarehouseName(); + if (Strings.isNullOrEmpty(whName)) { + ErrorReport.reportSemanticException(ErrorCode.ERR_INVALID_WAREHOUSE_NAME); + } + return null; + } + + @Override + public Void visitDropWarehouseStatement(DropWarehouseStmt statement, ConnectContext context) { + String whName = statement.getWarehouseName(); + if (Strings.isNullOrEmpty(whName)) { + ErrorReport.reportSemanticException(ErrorCode.ERR_INVALID_WAREHOUSE_NAME); + } + + if (whName.equals(WarehouseManager.DEFAULT_WAREHOUSE_NAME)) { + throw new SemanticException("Can't drop the default_warehouse"); + } + + return null; + } + + @Override + public Void visitSetWarehouseStatement(SetWarehouseStmt statement, ConnectContext context) { + String whName = statement.getWarehouseName(); + if (Strings.isNullOrEmpty(whName)) { + ErrorReport.reportSemanticException(ErrorCode.ERR_INVALID_WAREHOUSE_NAME); + } + FeNameFormat.checkWarehouseName(whName); + return null; + } + + @Override + public Void visitShowWarehousesStatement(ShowWarehousesStmt node, ConnectContext context) { + return null; + } + } + +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AddBackendClause.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AddBackendClause.java index e4ebe1a7f2d8b..68fefd3d0b468 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AddBackendClause.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AddBackendClause.java @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. - package com.starrocks.sql.ast; import com.starrocks.sql.parser.NodePosition; @@ -20,13 +19,20 @@ import java.util.List; public class AddBackendClause extends BackendClause { + private final String warehouse; - public AddBackendClause(List hostPorts) { + public AddBackendClause(List hostPorts, String warehouse) { super(hostPorts, NodePosition.ZERO); + this.warehouse = warehouse; } - public AddBackendClause(List hostPorts, NodePosition pos) { + public AddBackendClause(List hostPorts, String warehouse, NodePosition pos) { super(hostPorts, pos); + this.warehouse = warehouse; + } + + public String getWarehouse() { + return warehouse; } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AddComputeNodeClause.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AddComputeNodeClause.java index 1e4e7450b15f4..500142f0705d4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AddComputeNodeClause.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AddComputeNodeClause.java @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. - package com.starrocks.sql.ast; import com.starrocks.sql.parser.NodePosition; @@ -20,13 +19,15 @@ import java.util.List; public class AddComputeNodeClause extends ComputeNodeClause { + private final String warehouse; - public AddComputeNodeClause(List hostPorts) { - this(hostPorts, NodePosition.ZERO); + public AddComputeNodeClause(List hostPorts, String warehouse, NodePosition pos) { + super(hostPorts, pos); + this.warehouse = warehouse; } - public AddComputeNodeClause(List hostPorts, NodePosition pos) { - super(hostPorts, pos); + public String getWarehouse() { + return warehouse; } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java index ff96abf35c89d..79d614a6b7c5c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java @@ -61,6 +61,14 @@ import com.starrocks.sql.ast.pipe.DropPipeStmt; import com.starrocks.sql.ast.pipe.PipeName; import com.starrocks.sql.ast.pipe.ShowPipeStmt; +import com.starrocks.sql.ast.warehouse.CreateWarehouseStmt; +import com.starrocks.sql.ast.warehouse.DropWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ResumeWarehouseStmt; +import com.starrocks.sql.ast.warehouse.SetWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ShowClustersStmt; +import com.starrocks.sql.ast.warehouse.ShowNodesStmt; +import com.starrocks.sql.ast.warehouse.ShowWarehousesStmt; +import com.starrocks.sql.ast.warehouse.SuspendWarehouseStmt; public interface AstVisitor { default R visit(ParseNode node) { @@ -673,7 +681,6 @@ default R visitShowTriggersStatement(ShowTriggersStmt statement, C context) { return visitShowStatement(statement, context); } - // ---------------------------------------- Authz Statement ---------------------------------------------------- default R visitBaseCreateAlterUserStmt(BaseCreateAlterUserStmt statement, C context) { @@ -979,6 +986,40 @@ default R visitCancelRefreshDictionaryStatement(CancelRefreshDictionaryStmt clau return visitDDLStatement(clause, context); } + // ---------------------------------------- Warehouse Statement ---------------------------------------------------- + + default R visitShowWarehousesStatement(ShowWarehousesStmt statement, C context) { + return visitShowStatement(statement, context); + } + + default R visitShowClusterStatement(ShowClustersStmt statement, C context) { + return visitShowStatement(statement, context); + } + + default R visitCreateWarehouseStatement(CreateWarehouseStmt statement, C context) { + return visitDDLStatement(statement, context); + } + + default R visitDropWarehouseStatement(DropWarehouseStmt statement, C context) { + return visitDDLStatement(statement, context); + } + + default R visitSuspendWarehouseStatement(SuspendWarehouseStmt statement, C context) { + return visitDDLStatement(statement, context); + } + + default R visitResumeWarehouseStatement(ResumeWarehouseStmt statement, C context) { + return visitDDLStatement(statement, context); + } + + default R visitSetWarehouseStatement(SetWarehouseStmt statement, C context) { + return visitStatement(statement, context); + } + + default R visitShowNodesStatement(ShowNodesStmt statement, C context) { + return visitShowStatement(statement, context); + } + // ------------------------------------------- Unsupported statement --------------------------------------------------------- default R visitUnsupportedStatement(UnsupportedStmt statement, C context) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/DropBackendClause.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/DropBackendClause.java index 7cf5469ec3e16..5a2e4c392b41e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/DropBackendClause.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/DropBackendClause.java @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. - package com.starrocks.sql.ast; import com.starrocks.sql.parser.NodePosition; @@ -21,25 +20,26 @@ public class DropBackendClause extends BackendClause { private final boolean force; + public String warehouse; - public DropBackendClause(List hostPorts) { - super(hostPorts, NodePosition.ZERO); - this.force = true; - } - - public DropBackendClause(List hostPorts, boolean force) { - this(hostPorts, force, NodePosition.ZERO); + public DropBackendClause(List hostPorts, boolean force, String warehouse) { + this(hostPorts, force, warehouse, NodePosition.ZERO); } - public DropBackendClause(List hostPorts, boolean force, NodePosition pos) { + public DropBackendClause(List hostPorts, boolean force, String warehouse, NodePosition pos) { super(hostPorts, pos); this.force = force; + this.warehouse = warehouse; } public boolean isForce() { return force; } + public String getWarehouse() { + return warehouse; + } + @Override public R accept(AstVisitor visitor, C context) { return visitor.visitDropBackendClause(this, context); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/DropComputeNodeClause.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/DropComputeNodeClause.java index 9a49999cb6dc2..00f9742c845ac 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/DropComputeNodeClause.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/DropComputeNodeClause.java @@ -19,13 +19,19 @@ import java.util.List; public class DropComputeNodeClause extends ComputeNodeClause { + public String warehouse; - public DropComputeNodeClause(List hostPorts) { - this(hostPorts, NodePosition.ZERO); + public DropComputeNodeClause(List hostPorts, String warehouse) { + this(hostPorts, warehouse, NodePosition.ZERO); } - public DropComputeNodeClause(List hostPorts, NodePosition pos) { + public DropComputeNodeClause(List hostPorts, String warehouse, NodePosition pos) { super(hostPorts, pos); + this.warehouse = warehouse; + } + + public String getWarehouse() { + return warehouse; } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/CreateWarehouseStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/CreateWarehouseStmt.java new file mode 100644 index 0000000000000..f34c7e6cd0e5a --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/CreateWarehouseStmt.java @@ -0,0 +1,87 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.ast.warehouse; + +import com.starrocks.common.util.PrintableMap; +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.DdlStmt; +import com.starrocks.sql.parser.NodePosition; + +import java.util.Map; + +public class CreateWarehouseStmt extends DdlStmt { + private final boolean ifNotExists; + private final String warehouseName; + private Map properties; + private final String comment; + + public CreateWarehouseStmt(boolean ifNotExists, + String warehouseName, + Map properties, + String comment) { + this(ifNotExists, warehouseName, properties, comment, NodePosition.ZERO); + } + + public CreateWarehouseStmt(boolean ifNotExists, String warehouseName, + Map properties, + String comment, + NodePosition pos) { + super(pos); + this.ifNotExists = ifNotExists; + this.warehouseName = warehouseName; + this.properties = properties; + this.comment = comment; + } + + public String getWarehouseName() { + return warehouseName; + } + + public boolean isSetIfNotExists() { + return ifNotExists; + } + + public Map getProperties() { + return this.properties; + } + + public String getComment() { + return comment; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitCreateWarehouseStatement(this, context); + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("CREATE WAREHOUSE "); + if (ifNotExists) { + sb.append("IF NOT EXISTS "); + } + sb.append("'").append(warehouseName).append("' "); + if (comment != null) { + sb.append("COMMENT \"").append(comment).append("\" "); + } + sb.append("WITH PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")"); + return sb.toString(); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/DropWarehouseStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/DropWarehouseStmt.java new file mode 100644 index 0000000000000..7b39037b7dae2 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/DropWarehouseStmt.java @@ -0,0 +1,59 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.ast.warehouse; + +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.DdlStmt; +import com.starrocks.sql.parser.NodePosition; + +public class DropWarehouseStmt extends DdlStmt { + private final boolean ifExists; + private final String warehouseName; + + public DropWarehouseStmt(boolean ifExists, String warehouseName) { + this(ifExists, warehouseName, NodePosition.ZERO); + } + + public DropWarehouseStmt(boolean ifExists, String warehouseName, NodePosition pos) { + super(pos); + this.ifExists = ifExists; + this.warehouseName = warehouseName; + } + + public String getWarehouseName() { + return warehouseName; + } + + public boolean isSetIfExists() { + return ifExists; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitDropWarehouseStatement(this, context); + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("DROP WAREHOUSE "); + if (ifExists) { + sb.append("IF EXISTS "); + } + sb.append("\'" + warehouseName + "\'"); + return sb.toString(); + } + +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ResumeWarehouseStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ResumeWarehouseStmt.java new file mode 100644 index 0000000000000..44f540972215b --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ResumeWarehouseStmt.java @@ -0,0 +1,41 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.ast.warehouse; + +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.DdlStmt; +import com.starrocks.sql.parser.NodePosition; + +public class ResumeWarehouseStmt extends DdlStmt { + private String warehouseName; + + public ResumeWarehouseStmt(String warehouseName) { + this(warehouseName, NodePosition.ZERO); + } + + public ResumeWarehouseStmt(String warehouseName, NodePosition pos) { + super(pos); + this.warehouseName = warehouseName; + } + + public String getWarehouseName() { + return warehouseName; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitResumeWarehouseStatement(this, context); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/SetWarehouseStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/SetWarehouseStmt.java new file mode 100644 index 0000000000000..1b0eae203ce75 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/SetWarehouseStmt.java @@ -0,0 +1,47 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.ast.warehouse; + +import com.starrocks.analysis.RedirectStatus; +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.StatementBase; +import com.starrocks.sql.parser.NodePosition; + +public class SetWarehouseStmt extends StatementBase { + private final String warehouseName; + + public SetWarehouseStmt(String warehouseName) { + this(warehouseName, NodePosition.ZERO); + } + + public SetWarehouseStmt(String warehouseName, NodePosition pos) { + super(pos); + this.warehouseName = warehouseName; + } + + public String getWarehouseName() { + return warehouseName; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitSetWarehouseStatement(this, context); + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.NO_FORWARD; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowClustersStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowClustersStmt.java new file mode 100644 index 0000000000000..f29b2603defa5 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowClustersStmt.java @@ -0,0 +1,58 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.ast.warehouse; + +import com.starrocks.catalog.Column; +import com.starrocks.catalog.ScalarType; +import com.starrocks.qe.ShowResultSetMetaData; +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.ShowStmt; +import com.starrocks.sql.parser.NodePosition; + +// show clusters of a warehouse +public class ShowClustersStmt extends ShowStmt { + private String warehouseName; + private static final ShowResultSetMetaData META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("ClusterId", ScalarType.createVarchar(20))) + .addColumn(new Column("WorkerGroupId", ScalarType.createVarchar(20))) + .addColumn(new Column("ComputeNodeIds", ScalarType.createVarchar(256))) + .addColumn(new Column("Pending", ScalarType.createVarchar(20))) + .addColumn(new Column("Running", ScalarType.createVarchar(20))) + .build(); + + public ShowClustersStmt(String warehouseName) { + this(warehouseName, NodePosition.ZERO); + } + + public ShowClustersStmt(String warehouseName, NodePosition pos) { + super(pos); + this.warehouseName = warehouseName; + } + + public String getWarehouseName() { + return warehouseName; + } + + @Override + public ShowResultSetMetaData getMetaData() { + return META_DATA; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitShowClusterStatement(this, context); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowNodesStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowNodesStmt.java new file mode 100644 index 0000000000000..03dda259ab6b3 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowNodesStmt.java @@ -0,0 +1,75 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific + +package com.starrocks.sql.ast.warehouse; + +import com.starrocks.catalog.Column; +import com.starrocks.catalog.ScalarType; +import com.starrocks.qe.ShowResultSetMetaData; +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.ShowStmt; +import com.starrocks.sql.parser.NodePosition; + +// Show nodes from warehouse statement +public class ShowNodesStmt extends ShowStmt { + private static final ShowResultSetMetaData META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("WarehouseName", ScalarType.createVarchar(256))) + .addColumn(new Column("ClusterId", ScalarType.createVarchar(20))) + .addColumn(new Column("WorkerGroupId", ScalarType.createVarchar(20))) + .addColumn(new Column("NodeId", ScalarType.createVarchar(20))) + .addColumn(new Column("WorkerId", ScalarType.createVarchar(20))) + .addColumn(new Column("IP", ScalarType.createVarchar(256))) + .addColumn(new Column("HeartbeatPort", ScalarType.createVarchar(20))) + .addColumn(new Column("BePort", ScalarType.createVarchar(20))) + .addColumn(new Column("HttpPort", ScalarType.createVarchar(20))) + .addColumn(new Column("BrpcPort", ScalarType.createVarchar(20))) + .addColumn(new Column("StarletPort", ScalarType.createVarchar(20))) + .addColumn(new Column("LastStartTime", ScalarType.createVarchar(256))) + .addColumn(new Column("LastUpdateMs", ScalarType.createVarchar(256))) + .addColumn(new Column("Alive", ScalarType.createVarchar(20))) + .addColumn(new Column("ErrMsg", ScalarType.createVarchar(256))) + .addColumn(new Column("Version", ScalarType.createVarchar(20))) + .addColumn(new Column("NumRunningQueries", ScalarType.createVarchar(20))) + .addColumn(new Column("CpuCores", ScalarType.createVarchar(20))) + .addColumn(new Column("MemUsedPct", ScalarType.createVarchar(20))) + .addColumn(new Column("CpuUsedPct", ScalarType.createVarchar(20))) + .build(); + + private final String pattern; + private final String warehouseName; + + public ShowNodesStmt(String warehouseName, String pattern, NodePosition pos) { + super(pos); + this.warehouseName = warehouseName; + this.pattern = pattern; + } + + public String getPattern() { + return pattern; + } + + public String getWarehouseName() { + return warehouseName; + } + + @Override + public ShowResultSetMetaData getMetaData() { + return META_DATA; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitShowNodesStatement(this, context); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowWarehousesStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowWarehousesStmt.java new file mode 100644 index 0000000000000..ac9166271cdd3 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/ShowWarehousesStmt.java @@ -0,0 +1,78 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.ast.warehouse; + +import com.google.common.base.Strings; +import com.starrocks.catalog.Column; +import com.starrocks.catalog.ScalarType; +import com.starrocks.qe.ShowResultSetMetaData; +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.ShowStmt; +import com.starrocks.sql.parser.NodePosition; + +// Show warehouse statement. +public class ShowWarehousesStmt extends ShowStmt { + private static final ShowResultSetMetaData META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("Id", ScalarType.createVarchar(20))) + .addColumn(new Column("Name", ScalarType.createVarchar(256))) + .addColumn(new Column("State", ScalarType.createVarchar(20))) + .addColumn(new Column("NodeCount", ScalarType.createVarchar(20))) + .addColumn(new Column("CurrentClusterCount", ScalarType.createVarchar(20))) + .addColumn(new Column("MaxClusterCount", ScalarType.createVarchar(20))) + .addColumn(new Column("StartedClusters", ScalarType.createVarchar(20))) + .addColumn(new Column("RunningSql", ScalarType.createVarchar(20))) + .addColumn(new Column("QueuedSql", ScalarType.createVarchar(20))) + .addColumn(new Column("CreatedOn", ScalarType.createVarchar(20))) + .addColumn(new Column("ResumedOn", ScalarType.createVarchar(20))) + .addColumn(new Column("UpdatedOn", ScalarType.createVarchar(20))) + .addColumn(new Column("Comment", ScalarType.createVarchar(256))) + .build(); + private final String pattern; + + public ShowWarehousesStmt(String pattern) { + this(pattern, NodePosition.ZERO); + } + + public ShowWarehousesStmt(String pattern, NodePosition pos) { + super(pos); + this.pattern = Strings.nullToEmpty(pattern); + } + + public String getPattern() { + return pattern; + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("SHOW WAREHOUSES"); + if (!pattern.isEmpty()) { + sb.append(" LIKE '").append(pattern).append("'"); + } + return sb.toString(); + } + + @Override + public ShowResultSetMetaData getMetaData() { + return META_DATA; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitShowWarehousesStatement(this, context); + } +} + diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/SuspendWarehouseStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/SuspendWarehouseStmt.java new file mode 100644 index 0000000000000..886b08bd5e5c8 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/warehouse/SuspendWarehouseStmt.java @@ -0,0 +1,41 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.ast.warehouse; + +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.DdlStmt; +import com.starrocks.sql.parser.NodePosition; + +public class SuspendWarehouseStmt extends DdlStmt { + private String warehouseName; + + public SuspendWarehouseStmt(String warehouseName) { + this(warehouseName, NodePosition.ZERO); + } + + public SuspendWarehouseStmt(String warehouseName, NodePosition pos) { + super(pos); + this.warehouseName = warehouseName; + } + + public String getWarehouseName() { + return warehouseName; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitSuspendWarehouseStatement(this, context); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java index f1cc6f18fb31c..c944dc68f8273 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java @@ -106,6 +106,7 @@ import com.starrocks.mysql.MysqlPassword; import com.starrocks.qe.SqlModeHelper; import com.starrocks.scheduler.persist.TaskSchedule; +import com.starrocks.server.WarehouseManager; import com.starrocks.sql.ShowTemporaryTableStmt; import com.starrocks.sql.analyzer.AnalyzerUtils; import com.starrocks.sql.analyzer.RelationId; @@ -450,6 +451,14 @@ import com.starrocks.sql.ast.pipe.DropPipeStmt; import com.starrocks.sql.ast.pipe.PipeName; import com.starrocks.sql.ast.pipe.ShowPipeStmt; +import com.starrocks.sql.ast.warehouse.CreateWarehouseStmt; +import com.starrocks.sql.ast.warehouse.DropWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ResumeWarehouseStmt; +import com.starrocks.sql.ast.warehouse.SetWarehouseStmt; +import com.starrocks.sql.ast.warehouse.ShowClustersStmt; +import com.starrocks.sql.ast.warehouse.ShowNodesStmt; +import com.starrocks.sql.ast.warehouse.ShowWarehousesStmt; +import com.starrocks.sql.ast.warehouse.SuspendWarehouseStmt; import com.starrocks.sql.common.PListCell; import com.starrocks.sql.util.EitherOr; import com.starrocks.statistic.StatsConstants; @@ -3905,16 +3914,28 @@ public ParseNode visitModifyFrontendHostClause(StarRocksParser.ModifyFrontendHos @Override public ParseNode visitAddBackendClause(StarRocksParser.AddBackendClauseContext context) { + String whName = WarehouseManager.DEFAULT_WAREHOUSE_NAME; + if (context.warehouseName != null) { + Identifier identifier = (Identifier) visit(context.identifierOrString()); + whName = identifier.getValue(); + } + List backends = context.string().stream().map(c -> ((StringLiteral) visit(c)).getStringValue()).collect(toList()); - return new AddBackendClause(backends, createPos(context)); + return new AddBackendClause(backends, whName, createPos(context)); } @Override public ParseNode visitDropBackendClause(StarRocksParser.DropBackendClauseContext context) { + String whName = WarehouseManager.DEFAULT_WAREHOUSE_NAME; + if (context.warehouseName != null) { + Identifier identifier = (Identifier) visit(context.identifierOrString()); + whName = identifier.getValue(); + } + List clusters = context.string().stream().map(c -> ((StringLiteral) visit(c)).getStringValue()).collect(toList()); - return new DropBackendClause(clusters, context.FORCE() != null, createPos(context)); + return new DropBackendClause(clusters, context.FORCE() != null, whName, createPos(context)); } @Override @@ -3943,16 +3964,28 @@ public ParseNode visitModifyBackendClause(StarRocksParser.ModifyBackendClauseCon @Override public ParseNode visitAddComputeNodeClause(StarRocksParser.AddComputeNodeClauseContext context) { + String whName = WarehouseManager.DEFAULT_WAREHOUSE_NAME; + if (context.warehouseName != null) { + Identifier identifier = (Identifier) visit(context.identifierOrString()); + whName = identifier.getValue(); + } + List hostPorts = context.string().stream().map(c -> ((StringLiteral) visit(c)).getStringValue()).collect(toList()); - return new AddComputeNodeClause(hostPorts); + return new AddComputeNodeClause(hostPorts, whName, createPos(context)); } @Override public ParseNode visitDropComputeNodeClause(StarRocksParser.DropComputeNodeClauseContext context) { + String whName = WarehouseManager.DEFAULT_WAREHOUSE_NAME; + if (context.warehouseName != null) { + Identifier identifier = (Identifier) visit(context.identifierOrString()); + whName = identifier.getValue(); + } + List hostPorts = context.string().stream().map(c -> ((StringLiteral) visit(c)).getStringValue()).collect(toList()); - return new DropComputeNodeClause(hostPorts, createPos(context)); + return new DropComputeNodeClause(hostPorts, whName, createPos(context)); } @Override @@ -4472,6 +4505,84 @@ public ParseNode visitAlterPipeStatement(StarRocksParser.AlterPipeStatementConte return new AlterPipeStmt(createPos(context), pipeName, alterPipeClause); } + // ---------------------------------------- Warehouse Statement --------------------------------------------------- + @Override + public ParseNode visitCreateWarehouseStatement(StarRocksParser.CreateWarehouseStatementContext context) { + Identifier identifier = (Identifier) visit(context.identifierOrString()); + String whName = identifier.getValue(); + Map properties = null; + if (context.properties() != null) { + properties = new HashMap<>(); + List propertyList = visit(context.properties().property(), Property.class); + for (Property property : propertyList) { + properties.put(property.getKey(), property.getValue()); + } + } + String comment = null; + if (context.comment() != null) { + comment = ((StringLiteral) visit(context.comment())).getStringValue(); + } + return new CreateWarehouseStmt(context.IF() != null, whName, properties, comment, createPos(context)); + } + + @Override + public ParseNode visitSuspendWarehouseStatement(StarRocksParser.SuspendWarehouseStatementContext context) { + String warehouseName = ((Identifier) visit(context.identifier())).getValue(); + return new SuspendWarehouseStmt(warehouseName, createPos(context)); + } + + @Override + public ParseNode visitResumeWarehouseStatement(StarRocksParser.ResumeWarehouseStatementContext context) { + String warehouseName = ((Identifier) visit(context.identifier())).getValue(); + return new ResumeWarehouseStmt(warehouseName, createPos(context)); + } + + @Override + public ParseNode visitDropWarehouseStatement(StarRocksParser.DropWarehouseStatementContext context) { + Identifier identifier = (Identifier) visit(context.identifierOrString()); + String warehouseName = identifier.getValue(); + return new DropWarehouseStmt(context.IF() != null, warehouseName, createPos(context)); + } + + @Override + public ParseNode visitSetWarehouseStatement(StarRocksParser.SetWarehouseStatementContext context) { + Identifier identifier = (Identifier) visit(context.identifierOrString()); + String warehouseName = identifier.getValue(); + return new SetWarehouseStmt(warehouseName, createPos(context)); + } + + @Override + public ParseNode visitShowWarehousesStatement(StarRocksParser.ShowWarehousesStatementContext context) { + String pattern = null; + if (context.pattern != null) { + StringLiteral stringLiteral = (StringLiteral) visit(context.pattern); + pattern = stringLiteral.getValue(); + } + + return new ShowWarehousesStmt(pattern, createPos(context)); + } + + @Override + public ParseNode visitShowClustersStatement(StarRocksParser.ShowClustersStatementContext context) { + String whName = ((Identifier) visit(context.identifier())).getValue(); + return new ShowClustersStmt(whName, createPos(context)); + } + + @Override + public ParseNode visitShowNodesStatement(StarRocksParser.ShowNodesStatementContext context) { + String pattern = null; + String warehouseName = null; + if (context.WAREHOUSE() != null) { + warehouseName = ((Identifier) visit(context.identifier())).getValue(); + } else if (context.WAREHOUSES() != null) { + if (context.pattern != null) { + StringLiteral stringLiteral = (StringLiteral) visit(context.pattern); + pattern = stringLiteral.getValue(); + } + } + return new ShowNodesStmt(warehouseName, pattern, createPos(context)); + } + // ------------------------------------------- Query Statement ----------------------------------------------------- @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 index f424a3227b21a..2e601b3c55cd4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 @@ -301,6 +301,16 @@ statement | showDictionaryStatement | cancelRefreshDictionaryStatement + // Warehouse Statement + | createWarehouseStatement + | dropWarehouseStatement + | suspendWarehouseStatement + | resumeWarehouseStatement + | setWarehouseStatement + | showWarehousesStatement + | showClustersStatement + | showNodesStatement + // Unsupported Statement | unsupportedStatement ; @@ -884,11 +894,11 @@ modifyFrontendHostClause ; addBackendClause - : ADD BACKEND string (',' string)* + : ADD BACKEND string (',' string)* (INTO WAREHOUSE warehouseName=identifierOrString)? ; dropBackendClause - : DROP BACKEND string (',' string)* FORCE? + : DROP BACKEND string (',' string)* (FROM WAREHOUSE warehouseName=identifierOrString)? FORCE? ; decommissionBackendClause @@ -901,11 +911,11 @@ modifyBackendClause ; addComputeNodeClause - : ADD COMPUTE NODE string (',' string)* + : ADD COMPUTE NODE string (',' string)* (INTO WAREHOUSE warehouseName=identifierOrString)? ; dropComputeNodeClause - : DROP COMPUTE NODE string (',' string)* + : DROP COMPUTE NODE string (',' string)* (FROM WAREHOUSE warehouseName=identifierOrString)? ; modifyBrokerClause @@ -1879,6 +1889,55 @@ lock_type | LOW_PRIORITY? WRITE ; +// ------------------------------------------- Plan Tuning Statement --------------------------------------------------- +addPlanAdvisorStatement + : ADD INTO PLAN ADVISOR queryStatement; + +clearPlanAdvisorStatement + : CLEAR PLAN ADVISOR; + +delPlanAdvisorStatement + : DELETE PLAN ADVISOR string; + +showPlanAdvisorStatement + : SHOW PLAN ADVISOR; + +// ---------------------------------------- Warehouse Statement --------------------------------------------------------- + +createWarehouseStatement + : CREATE (WAREHOUSE) (IF NOT EXISTS)? warehouseName=identifierOrString + comment? properties? + ; + +dropWarehouseStatement + : DROP WAREHOUSE (IF EXISTS)? warehouseName=identifierOrString + ; + +suspendWarehouseStatement + : SUSPEND WAREHOUSE (IF EXISTS)? identifier + ; + +resumeWarehouseStatement + : RESUME WAREHOUSE (IF EXISTS)? identifier + ; + +setWarehouseStatement + : SET SESSION? WAREHOUSE EQ? identifierOrString + ; + +showWarehousesStatement + : SHOW WAREHOUSES (LIKE pattern=string)? + ; + +showClustersStatement + : SHOW CLUSTERS FROM WAREHOUSE identifier + ; + +showNodesStatement + : SHOW NODES FROM WAREHOUSES (LIKE pattern=string)? + | SHOW NODES FROM WAREHOUSE identifier + ; + // ------------------------------------------- Query Statement --------------------------------------------------------- queryStatement diff --git a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java index 8ae50e7e78560..1d43e33d979b8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java @@ -51,6 +51,8 @@ import com.starrocks.catalog.Tablet; import com.starrocks.common.AnalysisException; import com.starrocks.common.DdlException; +import com.starrocks.common.ErrorCode; +import com.starrocks.common.ErrorReport; import com.starrocks.common.FeConstants; import com.starrocks.common.Pair; import com.starrocks.common.Status; @@ -59,7 +61,6 @@ import com.starrocks.common.util.concurrent.lock.LockType; import com.starrocks.common.util.concurrent.lock.Locker; import com.starrocks.datacache.DataCacheMetrics; -import com.starrocks.lake.StarOSAgent; import com.starrocks.metric.MetricRepo; import com.starrocks.persist.CancelDecommissionDiskInfo; import com.starrocks.persist.CancelDisableDiskInfo; @@ -74,12 +75,16 @@ import com.starrocks.server.WarehouseManager; import com.starrocks.sql.analyzer.AlterSystemStmtAnalyzer; import com.starrocks.sql.analyzer.SemanticException; +import com.starrocks.sql.ast.AddBackendClause; +import com.starrocks.sql.ast.AddComputeNodeClause; import com.starrocks.sql.ast.DropBackendClause; +import com.starrocks.sql.ast.DropComputeNodeClause; import com.starrocks.sql.ast.ModifyBackendClause; import com.starrocks.system.Backend.BackendState; import com.starrocks.thrift.TNetworkAddress; import com.starrocks.thrift.TResourceGroupUsage; import com.starrocks.thrift.TStatusCode; +import com.starrocks.warehouse.Warehouse; import org.apache.commons.validator.routines.InetAddressValidator; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -123,15 +128,15 @@ public SystemInfoService() { nodeSelector = new NodeSelector(this); } - public void addComputeNodes(List> hostPortPairs) + public void addComputeNodes(AddComputeNodeClause addComputeNodeClause) throws DdlException { - for (Pair pair : hostPortPairs) { + for (Pair pair : addComputeNodeClause.getHostPortPairs()) { checkSameNodeExist(pair.first, pair.second); } - for (Pair pair : hostPortPairs) { - addComputeNode(pair.first, pair.second); + for (Pair pair : addComputeNodeClause.getHostPortPairs()) { + addComputeNode(pair.first, pair.second, addComputeNodeClause.getWarehouse()); } } @@ -163,13 +168,11 @@ public void dropComputeNode(ComputeNode computeNode) { } // Final entry of adding compute node - private void addComputeNode(String host, int heartbeatPort) { + private void addComputeNode(String host, int heartbeatPort, String warehouse) throws DdlException { ComputeNode newComputeNode = new ComputeNode(GlobalStateMgr.getCurrentState().getNextId(), host, heartbeatPort); idToComputeNodeRef.put(newComputeNode.getId(), newComputeNode); setComputeNodeOwner(newComputeNode); - - newComputeNode.setWorkerGroupId(StarOSAgent.DEFAULT_WORKER_GROUP_ID); - newComputeNode.setWarehouseId(WarehouseManager.DEFAULT_WAREHOUSE_ID); + addComputeNodeToWarehouse(newComputeNode, warehouse); // log GlobalStateMgr.getCurrentState().getEditLog().logAddComputeNode(newComputeNode); @@ -184,16 +187,13 @@ public boolean isSingleBackendAndComputeNode() { return idToBackendRef.size() + idToComputeNodeRef.size() == 1; } - /** - * @param hostPortPairs : backend's host and port - */ - public void addBackends(List> hostPortPairs) throws DdlException { - for (Pair pair : hostPortPairs) { + public void addBackends(AddBackendClause addBackendClause) throws DdlException { + for (Pair pair : addBackendClause.getHostPortPairs()) { checkSameNodeExist(pair.first, pair.second); } - for (Pair pair : hostPortPairs) { - addBackend(pair.first, pair.second); + for (Pair pair : addBackendClause.getHostPortPairs()) { + addBackend(pair.first, pair.second, addBackendClause.getWarehouse()); } } @@ -230,8 +230,20 @@ public void setBackendOwner(Backend backend) { backend.setBackendState(BackendState.using); } + public void addComputeNodeToWarehouse(ComputeNode computeNode, String warehouseName) + throws DdlException { + Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouseAllowNull(warehouseName); + // check if the warehouse exist + if (warehouse == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_WAREHOUSE, String.format("name: %s", warehouseName)); + } + + computeNode.setWorkerGroupId(warehouse.getAnyWorkerGroupId()); + computeNode.setWarehouseId(warehouse.getId()); + } + // Final entry of adding backend - private void addBackend(String host, int heartbeatPort) { + private void addBackend(String host, int heartbeatPort, String warehouse) throws DdlException { Backend newBackend = new Backend(GlobalStateMgr.getCurrentState().getNextId(), host, heartbeatPort); // update idToBackend idToBackendRef.put(newBackend.getId(), newBackend); @@ -243,6 +255,7 @@ private void addBackend(String host, int heartbeatPort) { // add backend to DEFAULT_CLUSTER setBackendOwner(newBackend); + addComputeNodeToWarehouse(newBackend, warehouse); // log GlobalStateMgr.getCurrentState().getEditLog().logAddBackend(newBackend); @@ -344,21 +357,19 @@ public ShowResultSet modifyBackend(ModifyBackendClause modifyBackendClause) thro } } - public void dropComputeNodes(List> hostPortPairs) throws DdlException { - for (Pair pair : hostPortPairs) { - // check is already exist - if (getComputeNodeWithHeartbeatPort(pair.first, pair.second) == null) { - throw new DdlException("compute node does not exists[" + - NetUtils.getHostPortInAccessibleFormat(pair.first, pair.second) + "]"); - } + public void dropComputeNodes(DropComputeNodeClause dropComputeNodeClause) throws DdlException { + String warehouse = dropComputeNodeClause.getWarehouse(); + // check if the warehouse exist + if (GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouseAllowNull(warehouse) == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_WAREHOUSE, String.format("name: %s", warehouse)); } - for (Pair pair : hostPortPairs) { - dropComputeNode(pair.first, pair.second); + for (Pair pair : dropComputeNodeClause.getHostPortPairs()) { + dropComputeNode(pair.first, pair.second, warehouse); } } - public void dropComputeNode(String host, int heartbeatPort) + public void dropComputeNode(String host, int heartbeatPort, String warehouse) throws DdlException { ComputeNode dropComputeNode = getComputeNodeWithHeartbeatPort(host, heartbeatPort); if (dropComputeNode == null) { @@ -366,6 +377,13 @@ public void dropComputeNode(String host, int heartbeatPort) NetUtils.getHostPortInAccessibleFormat(host, heartbeatPort) + "]"); } + // check if warehouseName is right + Warehouse wh = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouseAllowNull(dropComputeNode.getWarehouseId()); + if (wh != null && !warehouse.equalsIgnoreCase(wh.getName())) { + throw new DdlException("compute node [" + host + ":" + heartbeatPort + + "] does not exist in warehouse " + warehouse); + } + // update idToComputeNode idToComputeNodeRef.remove(dropComputeNode.getId()); @@ -392,16 +410,14 @@ public void dropBackends(DropBackendClause dropBackendClause) throws DdlExceptio List> hostPortPairs = dropBackendClause.getHostPortPairs(); boolean needCheckWithoutForce = !dropBackendClause.isForce(); - for (Pair pair : hostPortPairs) { - // check is already exist - if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) { - throw new DdlException("backend does not exists[" + - NetUtils.getHostPortInAccessibleFormat(pair.first, pair.second) + "]"); - } + String warehouse = dropBackendClause.getWarehouse(); + // check if the warehouse exist + if (GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouseAllowNull(warehouse) == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_WAREHOUSE, String.format("name: %s", warehouse)); } for (Pair pair : hostPortPairs) { - dropBackend(pair.first, pair.second, needCheckWithoutForce); + dropBackend(pair.first, pair.second, warehouse, needCheckWithoutForce); } } @@ -412,7 +428,7 @@ public void dropBackend(long backendId) throws DdlException { throw new DdlException("Backend[" + backendId + "] does not exist"); } - dropBackend(backend.getHost(), backend.getHeartbeatPort(), false); + dropBackend(backend.getHost(), backend.getHeartbeatPort(), WarehouseManager.DEFAULT_WAREHOUSE_NAME, false); } protected void checkWhenNotForceDrop(Backend droppedBackend) { @@ -455,13 +471,24 @@ protected void checkWhenNotForceDrop(Backend droppedBackend) { } // final entry of dropping backend - public void dropBackend(String host, int heartbeatPort, boolean needCheckWithoutForce) throws DdlException { - if (getBackendWithHeartbeatPort(host, heartbeatPort) == null) { + public void dropBackend(String host, int heartbeatPort, String warehouse, boolean needCheckWithoutForce) throws DdlException { + Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort); + + if (droppedBackend == null) { throw new DdlException("backend does not exists[" + NetUtils.getHostPortInAccessibleFormat(host, heartbeatPort) + "]"); } - Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort); + // check if warehouseName is right + Warehouse wh = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouseAllowNull(droppedBackend.getWarehouseId()); + if (wh != null && !warehouse.equalsIgnoreCase(wh.getName())) { + LOG.warn("warehouseName in dropBackends is not equal, " + + "warehouseName from dropBackendClause is {}, while actual one is {}", + warehouse, wh.getName()); + throw new DdlException("backend [" + host + ":" + heartbeatPort + + "] does not exist in warehouse " + warehouse); + } + if (needCheckWithoutForce) { try { checkWhenNotForceDrop(droppedBackend); diff --git a/fe/fe-core/src/main/java/com/starrocks/warehouse/DefaultWarehouse.java b/fe/fe-core/src/main/java/com/starrocks/warehouse/DefaultWarehouse.java index 606c93ec3fb76..6cad46201c9ec 100644 --- a/fe/fe-core/src/main/java/com/starrocks/warehouse/DefaultWarehouse.java +++ b/fe/fe-core/src/main/java/com/starrocks/warehouse/DefaultWarehouse.java @@ -15,8 +15,10 @@ package com.starrocks.warehouse; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.starrocks.lake.StarOSAgent; +import java.util.ArrayList; import java.util.List; public class DefaultWarehouse extends Warehouse { @@ -35,4 +37,32 @@ public DefaultWarehouse(long id, String name) { public List getWorkerGroupIds() { return WORKER_GROUP_ID_LIST; } + + @Override + public Long getAnyWorkerGroupId() { + return StarOSAgent.DEFAULT_WORKER_GROUP_ID; + } + + @Override + public List getWarehouseInfo() { + return Lists.newArrayList( + String.valueOf(getId()), + getName(), + "AVAILABLE", + String.valueOf(0L), + String.valueOf(1L), + String.valueOf(1L), + String.valueOf(1L), + String.valueOf(0L), //TODO: need to be filled after + String.valueOf(0L), //TODO: need to be filled after + "", + "", + "", + comment); + } + + @Override + public List> getWarehouseNodesInfo() { + return new ArrayList<>(); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/warehouse/Warehouse.java b/fe/fe-core/src/main/java/com/starrocks/warehouse/Warehouse.java index c79db05517886..00b942b7ee435 100644 --- a/fe/fe-core/src/main/java/com/starrocks/warehouse/Warehouse.java +++ b/fe/fe-core/src/main/java/com/starrocks/warehouse/Warehouse.java @@ -55,5 +55,11 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, json); } + public abstract Long getAnyWorkerGroupId(); + public abstract List getWorkerGroupIds(); + + public abstract List getWarehouseInfo(); + + public abstract List> getWarehouseNodesInfo(); } diff --git a/fe/fe-core/src/test/java/com/starrocks/analysis/BackendStmtTest.java b/fe/fe-core/src/test/java/com/starrocks/analysis/BackendStmtTest.java index baa4d5750f0f8..1155374934c95 100644 --- a/fe/fe-core/src/test/java/com/starrocks/analysis/BackendStmtTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/analysis/BackendStmtTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.Lists; import com.starrocks.qe.ConnectContext; +import com.starrocks.server.WarehouseManager; import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.ast.AddBackendClause; import com.starrocks.sql.ast.AlterSystemStmt; @@ -42,23 +43,28 @@ public BackendClause createStmt(int type) { switch (type) { case 1: // missing ip - stmt = new AddBackendClause(Lists.newArrayList(":12346")); + stmt = new AddBackendClause(Lists.newArrayList(":12346"), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); break; case 2: // invalid ip - stmt = new AddBackendClause(Lists.newArrayList("asdasd:12345")); + stmt = new AddBackendClause(Lists.newArrayList("asdasd:12345"), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); break; case 3: // invalid port - stmt = new AddBackendClause(Lists.newArrayList("10.1.2.3:123467")); + stmt = new AddBackendClause(Lists.newArrayList("10.1.2.3:123467"), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); break; case 4: // normal add - stmt = new AddBackendClause(Lists.newArrayList("192.168.1.1:12345")); + stmt = new AddBackendClause(Lists.newArrayList("192.168.1.1:12345"), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); break; case 5: // normal remove - stmt = new DropBackendClause(Lists.newArrayList("192.168.1.2:12345")); + stmt = new DropBackendClause(Lists.newArrayList("192.168.1.2:12345"), true, + WarehouseManager.DEFAULT_WAREHOUSE_NAME); break; default: break; diff --git a/fe/fe-core/src/test/java/com/starrocks/cluster/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/com/starrocks/cluster/SystemInfoServiceTest.java index 6469c7d54589f..6f1c4a86845d4 100644 --- a/fe/fe-core/src/test/java/com/starrocks/cluster/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/cluster/SystemInfoServiceTest.java @@ -48,10 +48,13 @@ import com.starrocks.server.LocalMetastore; import com.starrocks.server.NodeMgr; import com.starrocks.server.RunMode; +import com.starrocks.server.WarehouseManager; import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.ast.AddBackendClause; +import com.starrocks.sql.ast.AddComputeNodeClause; import com.starrocks.sql.ast.AlterSystemStmt; import com.starrocks.sql.ast.DropBackendClause; +import com.starrocks.sql.parser.NodePosition; import com.starrocks.system.Backend; import com.starrocks.system.ComputeNode; import com.starrocks.system.NodeSelector; @@ -93,6 +96,9 @@ public class SystemInfoServiceTest { @Before public void setUp() throws IOException { + WarehouseManager warehouseManager = new WarehouseManager(); + warehouseManager.initDefaultWarehouse(); + new Expectations() { { editLog.logAddBackend((Backend) any); @@ -137,6 +143,10 @@ public void setUp() throws IOException { globalStateMgr.getTabletInvertedIndex(); minTimes = 0; result = invertedIndex; + + globalStateMgr.getWarehouseMgr(); + minTimes = 0; + result = warehouseManager; } }; @@ -228,7 +238,8 @@ public void validHostAndPortTest4() throws Exception { @Test public void addBackendTest() throws AnalysisException { clearAllBackend(); - AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234")); + AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234"), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); com.starrocks.sql.analyzer.Analyzer analyzer = new com.starrocks.sql.analyzer.Analyzer( com.starrocks.sql.analyzer.Analyzer.AnalyzerVisitor.getInstance()); new Expectations() { @@ -239,13 +250,13 @@ public void addBackendTest() throws AnalysisException { }; com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(stmt), new ConnectContext(null)); try { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(stmt.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(stmt); } catch (DdlException e) { Assert.fail(); } try { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(stmt.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(stmt); } catch (DdlException e) { Assert.assertTrue(e.getMessage().contains("already exists")); } @@ -269,7 +280,8 @@ public void addBackendTest() throws AnalysisException { @Test public void addComputeNodeTest() throws AnalysisException { clearAllBackend(); - AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234")); + AddComputeNodeClause stmt = new AddComputeNodeClause(Lists.newArrayList("192.168.0.1:1234"), + WarehouseManager.DEFAULT_WAREHOUSE_NAME, NodePosition.ZERO); com.starrocks.sql.analyzer.Analyzer analyzer = new com.starrocks.sql.analyzer.Analyzer( com.starrocks.sql.analyzer.Analyzer.AnalyzerVisitor.getInstance()); @@ -282,7 +294,7 @@ public void addComputeNodeTest() throws AnalysisException { com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(stmt), new ConnectContext(null)); try { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(stmt.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(stmt); } catch (DdlException e) { Assert.fail(); } @@ -291,7 +303,7 @@ public void addComputeNodeTest() throws AnalysisException { getComputeNodeWithHeartbeatPort("192.168.0.1", 1234)); try { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(stmt.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(stmt); } catch (DdlException e) { Assert.assertTrue(e.getMessage().contains("Compute node already exists with same host")); } @@ -300,7 +312,8 @@ public void addComputeNodeTest() throws AnalysisException { @Test public void removeBackendTest() throws AnalysisException { clearAllBackend(); - AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234")); + AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234"), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); com.starrocks.sql.analyzer.Analyzer analyzer = new com.starrocks.sql.analyzer.Analyzer( com.starrocks.sql.analyzer.Analyzer.AnalyzerVisitor.getInstance()); new Expectations() { @@ -311,12 +324,13 @@ public void removeBackendTest() throws AnalysisException { }; com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(stmt), new ConnectContext(null)); try { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(stmt.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(stmt); } catch (DdlException e) { e.printStackTrace(); } - DropBackendClause dropStmt = new DropBackendClause(Lists.newArrayList("192.168.0.1:1234")); + DropBackendClause dropStmt = + new DropBackendClause(Lists.newArrayList("192.168.0.1:1234"), true, WarehouseManager.DEFAULT_WAREHOUSE_NAME); com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(dropStmt), new ConnectContext(null)); try { GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().dropBackends(dropStmt); @@ -358,16 +372,18 @@ StarOSAgent getStarOSAgent() { } }; - AddBackendClause stmt2 = new AddBackendClause(Lists.newArrayList("192.168.0.1:1235")); + AddBackendClause stmt2 = new AddBackendClause(Lists.newArrayList("192.168.0.1:1235"), + WarehouseManager.DEFAULT_WAREHOUSE_NAME); com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(stmt2), new ConnectContext(null)); try { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(stmt2.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(stmt2); } catch (DdlException e) { e.printStackTrace(); } - DropBackendClause dropStmt2 = new DropBackendClause(Lists.newArrayList("192.168.0.1:1235")); + DropBackendClause dropStmt2 = + new DropBackendClause(Lists.newArrayList("192.168.0.1:1235"), true, WarehouseManager.DEFAULT_WAREHOUSE_NAME); com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(dropStmt2), new ConnectContext(null)); try { @@ -388,7 +404,8 @@ StarOSAgent getStarOSAgent() { @Test public void testSeqChooseComputeNodes() { clearAllBackend(); - AddBackendClause stmt = new AddBackendClause(Lists.newArrayList("192.168.0.1:1234")); + AddComputeNodeClause stmt = new AddComputeNodeClause(Lists.newArrayList("192.168.0.1:1234"), + WarehouseManager.DEFAULT_WAREHOUSE_NAME, NodePosition.ZERO); com.starrocks.sql.analyzer.Analyzer analyzer = new com.starrocks.sql.analyzer.Analyzer( com.starrocks.sql.analyzer.Analyzer.AnalyzerVisitor.getInstance()); @@ -401,7 +418,7 @@ public void testSeqChooseComputeNodes() { com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(stmt), new ConnectContext(null)); try { - GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(stmt.getHostPortPairs()); + GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(stmt); } catch (DdlException e) { Assert.fail(); } diff --git a/fe/fe-core/src/test/java/com/starrocks/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/com/starrocks/system/SystemInfoServiceTest.java index e1031901324b7..5189c6ce9c281 100644 --- a/fe/fe-core/src/test/java/com/starrocks/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/system/SystemInfoServiceTest.java @@ -21,6 +21,7 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.LocalMetastore; import com.starrocks.server.RunMode; +import com.starrocks.server.WarehouseManager; import com.starrocks.service.FrontendOptions; import com.starrocks.sql.analyzer.AlterSystemStmtAnalyzer; import com.starrocks.sql.analyzer.SemanticException; @@ -180,6 +181,9 @@ public RunMode getCurrentRunMode() { LocalMetastore localMetastore = new LocalMetastore(globalStateMgr, null, null); + WarehouseManager warehouseManager = new WarehouseManager(); + warehouseManager.initDefaultWarehouse(); + new Expectations() { { service.getBackendWithHeartbeatPort("newHost", 1000); @@ -189,12 +193,16 @@ public RunMode getCurrentRunMode() { globalStateMgr.getLocalMetastore(); minTimes = 0; result = localMetastore; + + globalStateMgr.getWarehouseMgr(); + minTimes = 0; + result = warehouseManager; } }; service.addBackend(be); be.setStarletPort(1001); - service.dropBackend("newHost", 1000, false); + service.dropBackend("newHost", 1000, WarehouseManager.DEFAULT_WAREHOUSE_NAME, false); Backend beIP = service.getBackendWithHeartbeatPort("newHost", 1000); Assert.assertTrue(beIP == null); }