Skip to content

Commit

Permalink
[Enhancement] Rebase warehouse proc framework (#52468)
Browse files Browse the repository at this point in the history
(cherry picked from commit fadd6b6)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java
#	fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
  • Loading branch information
HangyuanLiu committed Nov 1, 2024
1 parent afcf944 commit b281eb7
Show file tree
Hide file tree
Showing 21 changed files with 821 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,19 @@
import com.starrocks.privilege.StorageVolumePEntryObject;
import com.starrocks.privilege.TablePEntryObject;
import com.starrocks.privilege.UserPEntryObject;
import com.starrocks.privilege.WarehousePEntryObject;
import com.starrocks.server.CatalogMgr;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.MetadataMgr;
import com.starrocks.server.StorageVolumeMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.thrift.TGetGrantsToRolesOrUserItem;
import com.starrocks.thrift.TGetGrantsToRolesOrUserRequest;
import com.starrocks.thrift.TGetGrantsToRolesOrUserResponse;
import com.starrocks.thrift.TGrantsToType;
import com.starrocks.thrift.TSchemaTableType;
import com.starrocks.warehouse.Warehouse;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -397,6 +400,24 @@ private static Set<TGetGrantsToRolesOrUserItem> getGrantItems(
} else if (ObjectType.PIPE.equals(privEntry.getKey())) {
PipePEntryObject pipePEntryObject = (PipePEntryObject) privilegeEntry.getObject();
objects.addAll(pipePEntryObject.expandObjectNames());
} else if (ObjectType.WAREHOUSE.equals(privEntry.getKey())) {
WarehousePEntryObject warehousePEntryObject =
(WarehousePEntryObject) privilegeEntry.getObject();
long warehouseId = warehousePEntryObject.getId();
if (warehouseId == PrivilegeBuiltinConstants.ALL_WAREHOUSES_ID) {
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
Set<String> allWarehouseNames = warehouseManager.getAllWarehouseNames();
for (String warehouseName : allWarehouseNames) {
objects.add(Lists.newArrayList(null, null, warehouseName));
}
} else {
Warehouse warehouse =
GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseId);
if (warehouse == null) {
continue;
}
objects.add(Lists.newArrayList(null, null, warehouse.getName()));
}
}

ActionSet actionSet = privilegeEntry.getActionSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.starrocks.common.AnalysisException;
import com.starrocks.consistency.MetaRecoveryProdDir;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.warehouse.WarehouseProcDir;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -70,6 +71,7 @@ private ProcService() {
root.register("colocation_group", new ColocationGroupProcDir());
root.register("catalog", GlobalStateMgr.getCurrentState().getCatalogMgr().getProcNode());
root.register("compactions", new CompactionsProcNode());
root.register("warehouses", new WarehouseProcDir());
root.register("meta_recovery", new MetaRecoveryProdDir());
root.register("replications", new ReplicationsProcNode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class Locker {
private static final Logger LOG = LogManager.getLogger(Locker.class);
Expand Down Expand Up @@ -357,7 +358,7 @@ public void lockTablesWithIntensiveDbLock(Database database, List<Long> tableLis
/**
* No need to release lock explicitly, it will be released automatically when the locker failed.
*/
public boolean tryLockTablesWithIntensiveDbLock(Database database, List<Long> tableList, LockType lockType,
public boolean tryLockTablesWithIntensiveDbLock(Database database, List<Long> tableList, LockType lockType,
long timeout, TimeUnit unit) {
long timeoutMillis = timeout;
if (!unit.equals(TimeUnit.MILLISECONDS)) {
Expand Down Expand Up @@ -518,6 +519,7 @@ public boolean tryLockTableWithIntensiveDbLock(Database db, Long tableId, LockTy

/**
* Try to lock multi database and tables with intensive db lock.
*
* @return try if try lock success, false otherwise.
*/
public boolean tryLockTableWithIntensiveDbLock(LockParams lockParams, LockType lockType, long timeout, TimeUnit unit) {
Expand Down Expand Up @@ -586,10 +588,15 @@ void clearWaitingFor() {
}

private String getStackTrace(Thread thread) {
StackTraceElement[] stackTrace = thread.getStackTrace();
StackTraceElement element = stackTrace[3];
int lastIdx = element.getClassName().lastIndexOf(".");
return element.getClassName().substring(lastIdx + 1) + "." + element.getMethodName() + "():" + element.getLineNumber();
List<String> frames = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE)
.walk(stackFrame -> stackFrame.skip(2).limit(1)
.map(frame -> frame.getClassName() + "." + frame.getMethodName() + "():" + frame.getLineNumber())
.collect(Collectors.toList()));
if (frames.size() > 0) {
return frames.get(0);
} else {
return "";
}
}

public String getLockerStackTrace() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.http.rest;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.starrocks.metric.WarehouseMetricMgr;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.warehouse.Warehouse;
import com.starrocks.warehouse.WarehouseInfo;
import com.starrocks.warehouse.WarehouseLoadStatusInfo;

import java.util.Map;

public class WarehouseInfosBuilder {
private final Map<Long, WarehouseInfo> warehouseToInfo = Maps.newHashMap();

public static WarehouseInfosBuilder makeBuilderFromMetricAndMgrs() {
WarehouseInfosBuilder builder = new WarehouseInfosBuilder();

for (Warehouse warehouse : GlobalStateMgr.getCurrentState().getWarehouseMgr().getAllWarehouses()) {
builder.withWarehouseInfo(new WarehouseInfo(warehouse.getId(), warehouse.getName()));
}

return builder
.withNumUnfinishedQueries(WarehouseMetricMgr.getUnfinishedQueries())
.withNumUnfinishedBackupJobs(WarehouseMetricMgr.getUnfinishedBackupJobs())
.withNumUnfinishedRestoreJobs(WarehouseMetricMgr.getUnfinishedRestoreJobs())
.withLastFinishedJobTimestampMs(WarehouseMetricMgr.getLastFinishedJobTimestampMs())
.withLoadStatusInfo(GlobalStateMgr.getCurrentState().getLoadMgr().getWarehouseLoadInfo())
.withLoadStatusInfo(GlobalStateMgr.getCurrentState().getRoutineLoadMgr().getWarehouseLoadInfo())
.withLoadStatusInfo(GlobalStateMgr.getCurrentState().getStreamLoadMgr().getWarehouseLoadInfo());
}

@VisibleForTesting
WarehouseInfosBuilder() {
}

public Map<Long, WarehouseInfo> build() {
return warehouseToInfo;
}

public WarehouseInfosBuilder withNumUnfinishedQueries(Map<Long, Long> warehouseToDelta) {
return withUpdater(warehouseToDelta, WarehouseInfo::increaseNumUnfinishedQueryJobs);
}

public WarehouseInfosBuilder withNumUnfinishedBackupJobs(Map<Long, Long> warehouseToDelta) {
return withUpdater(warehouseToDelta, WarehouseInfo::increaseNumUnfinishedBackupJobs);
}

public WarehouseInfosBuilder withNumUnfinishedRestoreJobs(Map<Long, Long> warehouseToDelta) {
return withUpdater(warehouseToDelta, WarehouseInfo::increaseNumUnfinishedRestoreJobs);
}

public WarehouseInfosBuilder withLastFinishedJobTimestampMs(Map<Long, Long> warehouseToTimestampMs) {
return withUpdater(warehouseToTimestampMs, WarehouseInfo::updateLastFinishedJobTimeMs);
}

public WarehouseInfosBuilder withLoadStatusInfo(Map<Long, WarehouseLoadStatusInfo> warehouseToLoadInfo) {
warehouseToLoadInfo.forEach((warehouseId, loadInfo) -> {
WarehouseInfo info = warehouseToInfo.computeIfAbsent(warehouseId, WarehouseInfo::new);
info.increaseNumUnfinishedLoadJobs(loadInfo.getNumUnfinishedJobs());
info.updateLastFinishedJobTimeMs(loadInfo.getLastFinishedJobTimeMs());
});
return this;
}

public WarehouseInfosBuilder withWarehouseInfo(WarehouseInfo info) {
if (!warehouseToInfo.containsKey(info.getId())) {
warehouseToInfo.put(info.getId(), info);
} else {
WarehouseInfo destInfo = warehouseToInfo.get(info.getId());
destInfo.increaseNumUnfinishedQueryJobs(info.getNumUnfinishedQueryJobs());
destInfo.increaseNumUnfinishedLoadJobs(info.getNumUnfinishedLoadJobs());
destInfo.increaseNumUnfinishedBackupJobs(info.getNumUnfinishedBackupJobs());
destInfo.increaseNumUnfinishedRestoreJobs(info.getNumUnfinishedRestoreJobs());
destInfo.updateLastFinishedJobTimeMs(info.getLastFinishedJobTimestampMs());
}
return this;
}

@FunctionalInterface
interface BiConsumer<T1, T2> {
void accept(T1 t1, T2 t2);
}

private WarehouseInfosBuilder withUpdater(Map<Long, Long> warehouseIdToValue,
BiConsumer<WarehouseInfo, Long> updater) {
warehouseIdToValue.forEach((warehouseId, delta) -> {
WarehouseInfo info = warehouseToInfo.computeIfAbsent(warehouseId, WarehouseInfo::new);
updater.accept(info, delta);
});

return this;
}

}
18 changes: 17 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import com.starrocks.transaction.TabletFailInfo;
import com.starrocks.transaction.TransactionException;
import com.starrocks.transaction.TransactionState;
import com.starrocks.warehouse.LoadJobWithWarehouse;
import com.starrocks.warehouse.Warehouse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -103,7 +104,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback, Writable {
public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback, Writable, LoadJobWithWarehouse {

private static final Logger LOG = LogManager.getLogger(LoadJob.class);

Expand Down Expand Up @@ -242,10 +243,25 @@ protected void writeUnlock() {
lock.writeLock().unlock();
}

@Override
public long getCurrentWarehouseId() {
return warehouseId;
}

public void setWarehouseId(long warehouseId) {
this.warehouseId = warehouseId;
}

@Override
public boolean isFinal() {
return isCompleted();
}

@Override
public long getFinishTimestampMs() {
return getFinishTimestamp();
}

public long getId() {
return id;
}
Expand Down
16 changes: 16 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStatus;
import com.starrocks.warehouse.WarehouseLoadInfoBuilder;
import com.starrocks.warehouse.WarehouseLoadStatusInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -117,6 +119,9 @@ public class LoadMgr implements Writable, MemoryTrackable {

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

protected final WarehouseLoadInfoBuilder warehouseLoadInfoBuilder =
new WarehouseLoadInfoBuilder();

public LoadMgr(LoadJobScheduler loadJobScheduler) {
this.loadJobScheduler = loadJobScheduler;
}
Expand Down Expand Up @@ -388,6 +393,8 @@ private void unprotectedRemoveJobReleatedMeta(LoadJob job) {
if (job instanceof SparkLoadJob) {
((SparkLoadJob) job).clearSparkLauncherLog();
}

warehouseLoadInfoBuilder.withRemovedJob(job);
}

private boolean isJobExpired(LoadJob job, long currentTimeMs) {
Expand Down Expand Up @@ -792,6 +799,15 @@ public void loadLoadJobsV2JsonFormat(SRMetaBlockReader reader)
});
}

public Map<Long, WarehouseLoadStatusInfo> getWarehouseLoadInfo() {
readLock();
try {
return warehouseLoadInfoBuilder.buildFromJobs(idToLoadJob.values());
} finally {
readUnlock();
}
}

private void putLoadJob(LoadJob loadJob) {
idToLoadJob.put(loadJob.getId(), loadJob);
Map<String, List<LoadJob>> map =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import com.starrocks.transaction.TransactionException;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStatus;
import com.starrocks.warehouse.LoadJobWithWarehouse;
import com.starrocks.warehouse.Warehouse;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -130,7 +131,7 @@
* The routine load job support different streaming medium such as KAFKA and Pulsar
*/
public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback
implements Writable, GsonPostProcessable {
implements Writable, GsonPostProcessable, LoadJobWithWarehouse {
private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class);

public static final long DEFAULT_MAX_ERROR_NUM = 0;
Expand Down Expand Up @@ -339,6 +340,21 @@ public RoutineLoadJob(Long id, String name,
}
}

@Override
public long getCurrentWarehouseId() {
return warehouseId;
}

@Override
public boolean isFinal() {
return state.isFinalState();
}

@Override
public long getFinishTimestampMs() {
return getEndTimestamp();
}

protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException {
if (stmt.getRoutineLoadDesc() != null) {
setRoutineLoadDesc(stmt.getRoutineLoadDesc());
Expand Down Expand Up @@ -1735,10 +1751,6 @@ public boolean needRemove() {
return false;
}

public boolean isFinal() {
return state.isFinalState();
}

public static RoutineLoadJob read(DataInput in) throws IOException {
RoutineLoadJob job = null;
LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
import com.starrocks.system.ComputeNode;
import com.starrocks.transaction.TxnCommitAttachment;
import com.starrocks.warehouse.Warehouse;
import com.starrocks.warehouse.WarehouseLoadInfoBuilder;
import com.starrocks.warehouse.WarehouseLoadStatusInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -109,6 +111,9 @@ public class RoutineLoadMgr implements Writable, MemoryTrackable {
private Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap();
private Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap();

protected final WarehouseLoadInfoBuilder warehouseLoadStatusInfoBuilder =
new WarehouseLoadInfoBuilder();

private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

private void writeLock() {
Expand Down Expand Up @@ -662,6 +667,8 @@ private void unprotectedRemoveJobFromDb(RoutineLoadJob routineLoadJob) {
if (dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).isEmpty()) {
dbToNameToRoutineLoadJob.remove(routineLoadJob.getDbId());
}

warehouseLoadStatusInfoBuilder.withRemovedJob(routineLoadJob);
}

public void updateRoutineLoadJob() throws UserException {
Expand Down Expand Up @@ -810,6 +817,15 @@ public void loadRoutineLoadJobsV2(SRMetaBlockReader reader)
});
}

public Map<Long, WarehouseLoadStatusInfo> getWarehouseLoadInfo() {
readLock();
try {
return warehouseLoadStatusInfoBuilder.buildFromJobs(idToRoutineLoadJob.values());
} finally {
readUnlock();
}
}

@Override
public Map<String, Long> estimateCount() {
return ImmutableMap.of("RoutineLoad", (long) idToRoutineLoadJob.size());
Expand Down
Loading

0 comments on commit b281eb7

Please sign in to comment.