Skip to content

Commit

Permalink
[AMORO-3066] Optimizing the efficiency for optimizing-processes rest …
Browse files Browse the repository at this point in the history
…api (#3257)

* [AMORO-3066] Optimizing the efficiency for optimizing-processes rest api

* fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes rest api

* fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes rest api

* fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes rest api

* fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes rest api

* fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes rest api

* fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processes rest api
  • Loading branch information
klion26 authored Oct 18, 2024
1 parent 0a793da commit d97597f
Show file tree
Hide file tree
Showing 13 changed files with 427 additions and 55 deletions.
6 changes: 6 additions & 0 deletions amoro-ams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper</artifactId>
<version>${pagehelper.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.amoro.server.dashboard;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CommitMetaProducer;
Expand Down Expand Up @@ -65,7 +68,6 @@
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.HasTableOperations;
Expand Down Expand Up @@ -505,29 +507,34 @@ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) {
TableIdentifier tableIdentifier = amoroTable.id();
List<OptimizingProcessMeta> processMetaList =
getAs(
OptimizingMapper.class,
mapper ->
mapper.selectOptimizingProcesses(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName()));

processMetaList =
processMetaList.stream()
.filter(
p ->
StringUtils.isBlank(type)
|| type.equalsIgnoreCase(p.getOptimizingType().getStatus().displayValue()))
.filter(p -> status == null || status.name().equalsIgnoreCase(p.getStatus().name()))
.collect(Collectors.toList());

int total = processMetaList.size();
processMetaList =
processMetaList.stream().skip(offset).limit(limit).collect(Collectors.toList());
if (CollectionUtils.isEmpty(processMetaList)) {
return Pair.of(Collections.emptyList(), 0);
int total = 0;
// page helper is 1-based
int pageNumber = (offset / limit) + 1;
List<OptimizingProcessMeta> processMetaList = Collections.emptyList();
try (Page<?> ignored = PageHelper.startPage(pageNumber, limit, true)) {
processMetaList =
getAs(
OptimizingMapper.class,
mapper ->
mapper.selectOptimizingProcesses(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName(),
type,
status,
offset,
limit));
PageInfo<OptimizingProcessMeta> pageInfo = new PageInfo<>(processMetaList);
total = (int) pageInfo.getTotal();
LOG.info(
"Get optimizing processes total : {} , pageNumber:{}, limit:{}, offset:{}",
total,
pageNumber,
limit,
offset);
if (pageInfo.getSize() == 0) {
return Pair.of(Collections.emptyList(), 0);
}
}
List<Long> processIds =
processMetaList.stream()
Expand All @@ -537,6 +544,7 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
getAs(OptimizingMapper.class, mapper -> mapper.selectOptimizeTaskMetas(processIds)).stream()
.collect(Collectors.groupingBy(OptimizingTaskMeta::getProcessId));

LOG.info("Get {} optimizing tasks. ", optimizingTasks.size());
return Pair.of(
processMetaList.stream()
.map(p -> buildOptimizingProcessInfo(p, optimizingTasks.get(p.getProcessId())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,20 @@ public void getOptimizingProcesses(Context ctx) {
String db = ctx.pathParam("db");
String table = ctx.pathParam("table");
String type = ctx.queryParam("type");

if (StringUtils.isBlank(type)) {
// treat all blank string to null
type = null;
}

String status = ctx.queryParam("status");
Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);

int offset = (page - 1) * pageSize;
int limit = pageSize;
ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
Preconditions.checkArgument(offset >= 0, "offset[%s] must >= 0", offset);
Preconditions.checkArgument(limit >= 0, "limit[%s] must >= 0", limit);
Preconditions.checkState(serverCatalog.tableExists(db, table), "no such table");

TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
ProcessStatus processStatus =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.process.ProcessStatus;

public interface OptimizingProcess {

long getProcessId();
Expand All @@ -36,7 +38,7 @@ public interface OptimizingProcess {

OptimizingType getOptimizingType();

Status getStatus();
ProcessStatus getStatus();

long getRunningQuotaTime(long calculatingStartTime, long calculatingEndTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.manager.MetricManager;
Expand Down Expand Up @@ -354,7 +355,7 @@ private class TableOptimizingProcess implements OptimizingProcess, TaskRuntime.T
private final Map<OptimizingTaskId, TaskRuntime<RewriteStageTask>> taskMap = Maps.newHashMap();
private final Queue<TaskRuntime<RewriteStageTask>> taskQueue = new LinkedList<>();
private final Lock lock = new ReentrantLock();
private volatile Status status = OptimizingProcess.Status.RUNNING;
private volatile ProcessStatus status = ProcessStatus.RUNNING;
private volatile String failedReason;
private long endTime = AmoroServiceConstants.INVALID_TIME;
private Map<String, Long> fromSequence = Maps.newHashMap();
Expand Down Expand Up @@ -396,7 +397,7 @@ public TableOptimizingProcess(TableRuntime tableRuntime) {
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
if (this.status != OptimizingProcess.Status.CLOSED) {
if (this.status != ProcessStatus.CLOSED) {
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
Expand All @@ -413,18 +414,18 @@ public OptimizingType getOptimizingType() {
}

@Override
public Status getStatus() {
public ProcessStatus getStatus() {
return status;
}

@Override
public void close() {
lock.lock();
try {
if (this.status != Status.RUNNING) {
if (this.status != ProcessStatus.RUNNING) {
return;
}
this.status = OptimizingProcess.Status.CLOSED;
this.status = ProcessStatus.CLOSED;
this.endTime = System.currentTimeMillis();
persistProcessCompleted(false);
clearProcess(this);
Expand Down Expand Up @@ -468,7 +469,7 @@ public void acceptResult(TaskRuntime taskRuntime) {
} else {
clearProcess(this);
this.failedReason = taskRuntime.getFailReason();
this.status = OptimizingProcess.Status.FAILED;
this.status = ProcessStatus.FAILED;
this.endTime = taskRuntime.getEndTime();
persistProcessCompleted(false);
}
Expand All @@ -481,15 +482,14 @@ public void acceptResult(TaskRuntime taskRuntime) {
// the cleanup of task should be done after unlock to avoid deadlock
@Override
public void releaseResourcesIfNecessary() {
if (this.status == OptimizingProcess.Status.FAILED
|| this.status == OptimizingProcess.Status.CLOSED) {
if (this.status == ProcessStatus.FAILED || this.status == ProcessStatus.CLOSED) {
cancelTasks();
}
}

@Override
public boolean isClosed() {
return status == OptimizingProcess.Status.CLOSED;
return status == ProcessStatus.CLOSED;
}

@Override
Expand Down Expand Up @@ -566,12 +566,12 @@ public void commit() {
try {
hasCommitted = true;
buildCommit().commit();
status = Status.SUCCESS;
status = ProcessStatus.SUCCESS;
endTime = System.currentTimeMillis();
persistProcessCompleted(true);
} catch (Exception e) {
LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e);
status = Status.FAILED;
status = ProcessStatus.FAILED;
failedReason = ExceptionUtil.getErrorMessage(e, 4000);
endTime = System.currentTimeMillis();
persistProcessCompleted(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

package org.apache.amoro.server.persistence;

import static com.github.pagehelper.page.PageAutoDialect.registerDialectAlias;

import com.github.pagehelper.PageInterceptor;
import com.github.pagehelper.dialect.helper.MySqlDialect;
import com.github.pagehelper.dialect.helper.PostgreSqlDialect;
import com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
Expand Down Expand Up @@ -78,6 +84,9 @@ public static SqlSessionFactoryProvider getInstance() {
private volatile SqlSessionFactory sqlSessionFactory;

public void init(Configurations config) throws SQLException {

registerDialectAliases();

BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(config.getString(AmoroManagementConf.DB_CONNECTION_URL));
dataSource.setDriverClassName(config.getString(AmoroManagementConf.DB_DRIVER_CLASS_NAME));
Expand Down Expand Up @@ -116,6 +125,12 @@ public void init(Configurations config) throws SQLException {
configuration.addMapper(ResourceMapper.class);
configuration.addMapper(TableBlockerMapper.class);

PageInterceptor interceptor = new PageInterceptor();
Properties interceptorProperties = new Properties();
interceptorProperties.setProperty("reasonable", "false");
interceptor.setProperties(interceptorProperties);
configuration.addInterceptor(interceptor);

DatabaseIdProvider provider = new VendorDatabaseIdProvider();
Properties properties = new Properties();
properties.setProperty("MySQL", "mysql");
Expand All @@ -133,6 +148,12 @@ public void init(Configurations config) throws SQLException {
createTablesIfNeed(config);
}

private void registerDialectAliases() {
registerDialectAlias("postgres", PostgreSqlDialect.class);
registerDialectAlias("mysql", MySqlDialect.class);
registerDialectAlias("derby", SqlServer2012Dialect.class);
}

/**
* create tables for database
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
Expand Down Expand Up @@ -69,7 +69,7 @@ void insertOptimizingProcess(
@Param("processId") long processId,
@Param("targetSnapshotId") long targetSnapshotId,
@Param("targetChangeSnapshotId") long targetChangeSnapshotId,
@Param("status") OptimizingProcess.Status status,
@Param("status") ProcessStatus status,
@Param("optimizingType") OptimizingType optimizingType,
@Param("planTime") long planTime,
@Param("summary") MetricsSummary summary,
Expand All @@ -85,19 +85,23 @@ void insertOptimizingProcess(
void updateOptimizingProcess(
@Param("tableId") long tableId,
@Param("processId") long processId,
@Param("optimizingStatus") OptimizingProcess.Status status,
@Param("optimizingStatus") ProcessStatus status,
@Param("endTime") long endTime,
@Param("summary") MetricsSummary summary,
@Param("failedReason") String failedReason);

@Select(
"SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, a.table_name, a.target_snapshot_id,"
"<script>"
+ "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, a.table_name, a.target_snapshot_id,"
+ " a.target_change_snapshot_id, a.status, a.optimizing_type, a.plan_time, a.end_time,"
+ " a.fail_reason, a.summary, a.from_sequence, a.to_sequence FROM table_optimizing_process a"
+ " INNER JOIN table_identifier b ON a.table_id = b.table_id"
+ " WHERE a.catalog_name = #{catalogName} AND a.db_name = #{dbName} AND a.table_name = #{tableName}"
+ " AND b.catalog_name = #{catalogName} AND b.db_name = #{dbName} AND b.table_name = #{tableName}"
+ " ORDER BY process_id desc")
+ " <if test='optimizingType != null'> AND a.optimizing_type = #{optimizingType}</if>"
+ " <if test='optimizingStatus != null'> AND a.status = #{optimizingStatus}</if>"
+ " ORDER BY process_id desc"
+ "</script>")
@Results({
@Result(property = "processId", column = "process_id"),
@Result(property = "tableId", column = "table_id"),
Expand All @@ -124,7 +128,11 @@ void updateOptimizingProcess(
List<OptimizingProcessMeta> selectOptimizingProcesses(
@Param("catalogName") String catalogName,
@Param("dbName") String dbName,
@Param("tableName") String tableName);
@Param("tableName") String tableName,
@Param("optimizingType") String optimizingType,
@Param("optimizingStatus") ProcessStatus optimizingStatus,
@Param("pageNum") int pageNum,
@Param("pageSize") int pageSize);

/** Optimizing TaskRuntime operation below */
@Insert({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.amoro.AmoroTable;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;
import org.apache.amoro.server.table.TableManager;
Expand Down Expand Up @@ -73,8 +74,7 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
if (originalConfig.getOptimizingConfig().isEnabled()
&& !tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()) {
OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess();
if (optimizingProcess != null
&& optimizingProcess.getStatus() == OptimizingProcess.Status.RUNNING) {
if (optimizingProcess != null && optimizingProcess.getStatus() == ProcessStatus.RUNNING) {
optimizingProcess.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.TableOptimizing;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.resource.OptimizerInstance;
Expand Down Expand Up @@ -386,7 +386,7 @@ private void assertTaskCompleted(TaskRuntime taskRuntime) {
Assertions.assertEquals(
0, optimizingService().listTasks(defaultResourceGroup().getName()).size());
Assertions.assertEquals(
OptimizingProcess.Status.RUNNING,
ProcessStatus.RUNNING,
tableService()
.getRuntime(serverTableIdentifier().getId())
.getOptimizingProcess()
Expand Down
Loading

0 comments on commit d97597f

Please sign in to comment.