Skip to content

Commit

Permalink
[Enhancement] Rebase warehouse management framework (backport #52178) (
Browse files Browse the repository at this point in the history
…#52449)

Co-authored-by: Harbor Liu <[email protected]>
  • Loading branch information
mergify[bot] and HangyuanLiu authored Oct 30, 2024
1 parent de7ec77 commit 5c9fa21
Show file tree
Hide file tree
Showing 50 changed files with 1,768 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Void visitModifyFrontendHostClause(ModifyFrontendAddressClause clause, Vo
@Override
public Void visitAddBackendClause(AddBackendClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(clause.getHostPortPairs());
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(clause);
});
return null;
}
Expand Down Expand Up @@ -291,15 +291,15 @@ public Void visitModifyBrokerClause(ModifyBrokerClause clause, Void context) {
@Override
public Void visitAddComputeNodeClause(AddComputeNodeClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(clause.getHostPortPairs());
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(clause);
});
return null;
}

@Override
public Void visitDropComputeNodeClause(DropComputeNodeClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().dropComputeNodes(clause.getHostPortPairs());
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().dropComputeNodes(clause);
});
return null;
}
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int lake_compaction_history_size = 20;

@ConfField(mutable = true)
public static String lake_compaction_warehouse = "default_warehouse";

@ConfField(mutable = true)
public static String lake_background_warehouse = "default_warehouse";

// e.g. "tableId1;tableId2"
@ConfField(mutable = true)
public static String lake_compaction_disable_tables = "";
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import com.starrocks.persist.DropPartitionInfo;
import com.starrocks.persist.DropResourceOperationLog;
import com.starrocks.persist.DropStorageVolumeLog;
import com.starrocks.persist.DropWarehouseLog;
import com.starrocks.persist.GlobalVarPersistInfo;
import com.starrocks.persist.HbPackage;
import com.starrocks.persist.ImpersonatePrivInfo;
Expand Down Expand Up @@ -161,6 +162,7 @@
import com.starrocks.system.Frontend;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStateBatch;
import com.starrocks.warehouse.Warehouse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -955,6 +957,14 @@ public void readFields(DataInput in) throws IOException {
data = new Text(Text.readBinary(in));
break;
}
case OperationType.OP_CREATE_WAREHOUSE:
case OperationType.OP_ALTER_WAREHOUSE:
data = GsonUtils.GSON.fromJson(Text.readString(in), Warehouse.class);
break;
case OperationType.OP_DROP_WAREHOUSE: {
data = DropWarehouseLog.read(in);
break;
}
default: {
if (Config.metadata_ignore_unknown_operation_type) {
LOG.warn("UNKNOWN Operation Type {}", opCode);
Expand Down
49 changes: 49 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.staros.proto.StatusCode;
import com.staros.proto.UpdateMetaGroupInfo;
import com.staros.proto.WorkerGroupDetailInfo;
import com.staros.proto.WorkerGroupSpec;
import com.staros.proto.WorkerInfo;
import com.staros.util.LockCloseable;
import com.starrocks.common.Config;
Expand All @@ -56,6 +57,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -714,6 +716,53 @@ public List<String> listWorkerGroupIpPort(long workerGroupId) throws UserExcepti
}
}

// remove previous worker with same backend id
private void tryRemovePreviousWorkerGroup(long workerGroupId) {
try (LockCloseable lock = new LockCloseable(rwLock.writeLock())) {
Iterator<Map.Entry<Long, Long>> iterator = workerToNode.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Long> entry = iterator.next();
long nodeId = entry.getValue();
long workerId = entry.getKey();
ComputeNode node = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendOrComputeNode(nodeId);
if (node.getWorkerGroupId() == workerGroupId) {
iterator.remove();
workerToId.entrySet().removeIf(e -> e.getValue() == workerId);
}
}
}
}

public long createWorkerGroup(String size) throws DdlException {
prepare();

// size should be x0, x1, x2, x4...
WorkerGroupSpec spec = WorkerGroupSpec.newBuilder().setSize(size).build();
// owner means tenant, now there is only one tenant, so pass "Starrocks" to starMgr
String owner = "Starrocks";
WorkerGroupDetailInfo result = null;
try {
result = client.createWorkerGroup(serviceId, owner, spec, Collections.emptyMap(),
Collections.emptyMap());
} catch (StarClientException e) {
LOG.warn("Failed to create worker group. error: {}", e.getMessage());
throw new DdlException("Failed to create worker group. error: " + e.getMessage());
}
return result.getGroupId();
}

public void deleteWorkerGroup(long groupId) throws DdlException {
prepare();
try {
client.deleteWorkerGroup(serviceId, groupId);
} catch (StarClientException e) {
LOG.warn("Failed to delete worker group {}. error: {}", groupId, e.getMessage());
throw new DdlException("Failed to delete worker group. error: " + e.getMessage());
}

tryRemovePreviousWorkerGroup(groupId);
}

// dump all starmgr meta, for DEBUG purpose
public String dump() {
prepare();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.persist;

import com.google.gson.annotations.SerializedName;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.persist.gson.GsonUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DropWarehouseLog implements Writable {
@SerializedName(value = "name")
private String warehouseName;

public DropWarehouseLog(String warehouseName) {
this.warehouseName = warehouseName;
}

public String getWarehouseName() {
return warehouseName;
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

public static DropWarehouseLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, DropWarehouseLog.class);
}
}
24 changes: 22 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import com.starrocks.scheduler.persist.TaskRunStatusChange;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.LocalMetastore;
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.staros.StarMgrJournal;
import com.starrocks.staros.StarMgrServer;
Expand All @@ -115,6 +116,7 @@
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStateBatch;
import com.starrocks.warehouse.Warehouse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -1198,6 +1200,24 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal)
GlobalStateMgr.getCurrentState().getKeyMgr().replayAddKey(keyPB);
break;
}
case OperationType.OP_CREATE_WAREHOUSE: {
Warehouse wh = (Warehouse) journal.getData();
WarehouseManager warehouseMgr = globalStateMgr.getWarehouseMgr();
warehouseMgr.replayCreateWarehouse(wh);
break;
}
case OperationType.OP_DROP_WAREHOUSE: {
DropWarehouseLog log = (DropWarehouseLog) journal.getData();
WarehouseManager warehouseMgr = globalStateMgr.getWarehouseMgr();
warehouseMgr.replayDropWarehouse(log);
break;
}
case OperationType.OP_ALTER_WAREHOUSE: {
Warehouse wh = (Warehouse) journal.getData();
WarehouseManager warehouseMgr = globalStateMgr.getWarehouseMgr();
warehouseMgr.replayAlterWarehouse(wh);
break;
}
default: {
if (Config.metadata_ignore_unknown_operation_type) {
LOG.warn("UNKNOWN Operation Type {}", opCode);
Expand All @@ -1217,7 +1237,7 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal)
/**
* submit log to queue, wait for JournalWriter
*/
protected void logEdit(short op, Writable writable) {
public void logEdit(short op, Writable writable) {
JournalTask task = submitLog(op, writable, -1);
waitInfinity(task);
}
Expand Down Expand Up @@ -1971,7 +1991,7 @@ public void logPipeOp(PipeOpEntry opEntry) {
logEdit(OperationType.OP_PIPE, opEntry);
}

private void logJsonObject(short op, Object obj) {
public void logJsonObject(short op, Object obj) {
logEdit(op, out -> Text.writeString(out, GsonUtils.GSON.toJson(obj)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -931,10 +931,15 @@ public class OperationType {
public static final short OP_ADD_KEY = 13512;

/**
* NOTICE: OperationType cannot use a value exceeding 20000, and an error will be reported if it exceeds
* NOTICE: OperationType cannot use a value exceeding 20000, please follow the above sequence number
*/
public static final short OP_TYPE_EOF = 20000;

// warehouse
public static final short OP_CREATE_WAREHOUSE = 20101;
public static final short OP_DROP_WAREHOUSE = 20102;
public static final short OP_ALTER_WAREHOUSE = 20103;

public static final ImmutableSet<Short> IGNORABLE_OPERATIONS = buildIgnorableOperations();

private static ImmutableSet<Short> buildIgnorableOperations() {
Expand All @@ -953,7 +958,8 @@ private static ImmutableSet<Short> buildIgnorableOperations() {
System.exit(-1);
}

if (opType > OperationType.OP_TYPE_EOF) {
if (opType > OperationType.OP_TYPE_EOF &&
opType != OP_CREATE_WAREHOUSE && opType != OP_DROP_WAREHOUSE && opType != OP_ALTER_WAREHOUSE) {
LOG.fatal("OperationType cannot use a value exceeding 20000, " +
"and an error will be reported if it exceeds : {} = {}", field.getName(), opType);
System.exit(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
import com.starrocks.privilege.TablePEntryObject;
import com.starrocks.privilege.UserPEntryObject;
import com.starrocks.privilege.ViewPEntryObject;
import com.starrocks.privilege.WarehouseFCPEntryObject;
import com.starrocks.privilege.WarehousePEntryObject;
import com.starrocks.proto.EncryptionKeyPB;
import com.starrocks.replication.ReplicationTxnCommitAttachment;
import com.starrocks.server.SharedDataStorageVolumeMgr;
Expand Down Expand Up @@ -338,7 +338,7 @@ public class GsonUtils {
.registerSubtype(CatalogPEntryObject.class, "CatalogPEntryObject")
.registerSubtype(ResourceGroupPEntryObject.class, "ResourceGroupPEntryObject")
.registerSubtype(StorageVolumePEntryObject.class, "StorageVolumePEntryObject")
.registerSubtype(WarehouseFCPEntryObject.class, "WarehousePEntryObject")
.registerSubtype(WarehousePEntryObject.class, "WarehousePEntryObject")
.registerSubtype(PipePEntryObject.class, "PipePEntryObject")
.registerSubtype(PolicyFCEntryObject.class, "PolicyPEntryObject");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public int getId() {

public static final SRMetaBlockID PIPE_MGR = new SRMetaBlockID(32);

/**
* NOTICE: SRMetaBlockID cannot use a value exceeding 20000, please follow the above sequence number
*/
public static final SRMetaBlockID WAREHOUSE_MGR = new SRMetaBlockID(20001);

@Override
public String toString() {
return String.valueOf(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,14 @@ default Map<String, Expr> getColumnMaskingPolicy(ConnectContext context, TableNa
default Expr getRowAccessPolicy(ConnectContext currentUser, TableName tableName) {
return null;
}

default void checkWarehouseAction(UserIdentity currentUser, Set<Long> roleIds, String name, PrivilegeType privilegeType)
throws AccessDeniedException {
throw new AccessDeniedException();
}

default void checkAnyActionOnWarehouse(UserIdentity currentUser, Set<Long> roleIds, String name)
throws AccessDeniedException {
throw new AccessDeniedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public void initBuiltinRolesAndUsers() {
ObjectType.RESOURCE_GROUP,
ObjectType.PIPE,
ObjectType.GLOBAL_FUNCTION,
ObjectType.STORAGE_VOLUME);
ObjectType.STORAGE_VOLUME,
ObjectType.WAREHOUSE);
for (ObjectType objectType : objectTypes) {
initPrivilegeCollectionAllObjects(rolePrivilegeCollection, objectType,
provider.getAvailablePrivType(objectType));
Expand Down Expand Up @@ -195,9 +196,11 @@ public void initBuiltinRolesAndUsers() {
initPrivilegeCollections(
rolePrivilegeCollection,
ObjectType.SYSTEM,
List.of(PrivilegeType.NODE),
List.of(PrivilegeType.NODE, PrivilegeType.CREATE_WAREHOUSE),
null,
false);
initPrivilegeCollectionAllObjects(rolePrivilegeCollection, ObjectType.WAREHOUSE,
provider.getAvailablePrivType(ObjectType.WAREHOUSE));
rolePrivilegeCollection.disableMutable();

// 4. user_admin
Expand Down Expand Up @@ -270,7 +273,8 @@ protected void initPrivilegeCollectionAllObjects(
} else if (ObjectType.RESOURCE.equals(objectType)
|| ObjectType.CATALOG.equals(objectType)
|| ObjectType.RESOURCE_GROUP.equals(objectType)
|| ObjectType.STORAGE_VOLUME.equals(objectType)) {
|| ObjectType.STORAGE_VOLUME.equals(objectType)
|| ObjectType.WAREHOUSE.equals(objectType)) {
objects.add(provider.generateObject(objectType,
Lists.newArrayList("*"), globalStateMgr));
collection.grant(objectType, actionList, objects, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public DefaultAuthorizationProvider() {
PrivilegeType.REPOSITORY,
PrivilegeType.CREATE_RESOURCE_GROUP,
PrivilegeType.CREATE_GLOBAL_FUNCTION,
PrivilegeType.CREATE_STORAGE_VOLUME));
PrivilegeType.CREATE_STORAGE_VOLUME,
PrivilegeType.CREATE_WAREHOUSE));

typeToActionList.put(ObjectType.USER, Lists.newArrayList(
PrivilegeType.IMPERSONATE));
Expand Down Expand Up @@ -114,6 +115,11 @@ public DefaultAuthorizationProvider() {
PrivilegeType.DROP,
PrivilegeType.ALTER,
PrivilegeType.USAGE));

typeToActionList.put(ObjectType.WAREHOUSE, Lists.newArrayList(
PrivilegeType.USAGE,
PrivilegeType.ALTER,
PrivilegeType.DROP));
}

public static final String UNEXPECTED_TYPE = "unexpected type ";
Expand Down Expand Up @@ -185,6 +191,8 @@ public PEntryObject generateObject(ObjectType objectType, List<String> objectTok
return StorageVolumePEntryObject.generate(mgr, objectTokens);
} else if (ObjectType.PIPE.equals(objectType)) {
return PipePEntryObject.generate(mgr, objectTokens);
} else if (ObjectType.WAREHOUSE.equals(objectType)) {
return WarehousePEntryObject.generate(mgr, objectTokens);
}
throw new PrivilegeException(UNEXPECTED_TYPE + objectType.name());
}
Expand Down
Loading

0 comments on commit 5c9fa21

Please sign in to comment.