Skip to content

Commit

Permalink
Merge branch 'master' into feature-swagger
Browse files Browse the repository at this point in the history
  • Loading branch information
czy006 authored Nov 6, 2024
2 parents 815c0b8 + 0efbd59 commit 2e4fa1d
Show file tree
Hide file tree
Showing 22 changed files with 337 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,13 @@ private TableOptimizingProcess planInternal(TableRuntime tableRuntime) {
try {
AmoroTable<?> table = tableManager.loadTable(tableRuntime.getTableIdentifier());
OptimizingPlanner planner =
new OptimizingPlanner(
OptimizingPlanner.createOptimizingPlanner(
tableRuntime.refresh(table),
(MixedTable) table.originalTable(),
getAvailableCore(),
maxInputSizePerThread());
if (planner.isNecessary()) {
return new TableOptimizingProcess(planner);
return new TableOptimizingProcess(planner, tableRuntime);
} else {
tableRuntime.completeEmptyProcess();
return null;
Expand Down Expand Up @@ -371,9 +371,9 @@ public TaskRuntime poll() {
}
}

public TableOptimizingProcess(OptimizingPlanner planner) {
public TableOptimizingProcess(OptimizingPlanner planner, TableRuntime tableRuntime) {
processId = planner.getProcessId();
tableRuntime = planner.getTableRuntime();
this.tableRuntime = tableRuntime;
optimizingType = planner.getOptimizingType();
planTime = planner.getPlanTime();
targetSnapshotId = planner.getTargetSnapshotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

package org.apache.amoro.server.optimizing.plan;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
Expand All @@ -48,7 +48,9 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator {

protected final Pair<Integer, StructLike> partition;
protected final OptimizingConfig config;
protected final TableRuntime tableRuntime;
protected final ServerTableIdentifier identifier;
protected final long lastMinorOptimizingTime;
protected final long lastFullOptimizingTime;
private CommonPartitionEvaluator evaluator;
private TaskSplitter taskSplitter;
protected MixedTable tableObject;
Expand All @@ -74,15 +76,20 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator {
protected final Set<String> reservedDeleteFiles = Sets.newHashSet();

public AbstractPartitionPlan(
TableRuntime tableRuntime,
ServerTableIdentifier identifier,
MixedTable table,
OptimizingConfig config,
Pair<Integer, StructLike> partition,
long planTime) {
long planTime,
long lastMinorOptimizingTime,
long lastFullOptimizingTime) {
this.identifier = identifier;
this.partition = partition;
this.tableObject = table;
this.config = tableRuntime.getOptimizingConfig();
this.tableRuntime = tableRuntime;
this.config = config;
this.planTime = planTime;
this.lastMinorOptimizingTime = lastMinorOptimizingTime;
this.lastFullOptimizingTime = lastFullOptimizingTime;
}

@Override
Expand All @@ -98,7 +105,8 @@ protected CommonPartitionEvaluator evaluator() {
}

protected CommonPartitionEvaluator buildEvaluator() {
return new CommonPartitionEvaluator(tableRuntime, partition, planTime);
return new CommonPartitionEvaluator(
identifier, config, partition, planTime, lastMinorOptimizingTime, lastFullOptimizingTime);
}

@Override
Expand Down Expand Up @@ -317,10 +325,7 @@ public RewriteStageTask buildTask(OptimizingInputProperties properties) {
MixedTableUtil.getMixedTablePartitionSpecById(tableObject, partition.first());
String partitionPath = spec.partitionToPath(partition.second());
return new RewriteStageTask(
tableRuntime.getTableIdentifier().getId(),
partitionPath,
input,
properties.getProperties());
identifier.getId(), partitionPath, input, properties.getProperties());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.amoro.server.optimizing.plan;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
Expand All @@ -40,10 +40,12 @@ public class CommonPartitionEvaluator implements PartitionEvaluator {
private static final Logger LOG = LoggerFactory.getLogger(CommonPartitionEvaluator.class);

private final Set<String> deleteFileSet = Sets.newHashSet();
protected final TableRuntime tableRuntime;

private final Pair<Integer, StructLike> partition;
protected final ServerTableIdentifier identifier;
protected final OptimizingConfig config;
protected final long lastFullOptimizingTime;
protected final long lastMinorOptimizingTime;
protected final long fragmentSize;
protected final long minTargetSize;
protected final long planTime;
Expand Down Expand Up @@ -83,10 +85,15 @@ public class CommonPartitionEvaluator implements PartitionEvaluator {
private String name;

public CommonPartitionEvaluator(
TableRuntime tableRuntime, Pair<Integer, StructLike> partition, long planTime) {
ServerTableIdentifier identifier,
OptimizingConfig config,
Pair<Integer, StructLike> partition,
long planTime,
long lastMinorOptimizingTime,
long lastFullOptimizingTime) {
this.identifier = identifier;
this.config = config;
this.partition = partition;
this.tableRuntime = tableRuntime;
this.config = tableRuntime.getOptimizingConfig();
this.fragmentSize = config.getTargetSize() / config.getFragmentRatio();
this.minTargetSize = (long) (config.getTargetSize() * config.getMinTargetSizeRatio());
if (minTargetSize > config.getTargetSize() - fragmentSize) {
Expand All @@ -95,10 +102,11 @@ public CommonPartitionEvaluator(
+ "the another merge file.");
}
this.planTime = planTime;
this.lastMinorOptimizingTime = lastMinorOptimizingTime;
this.lastFullOptimizingTime = lastFullOptimizingTime;
this.reachFullInterval =
config.getFullTriggerInterval() >= 0
&& planTime - tableRuntime.getLastFullOptimizingTime()
> config.getFullTriggerInterval();
&& planTime - lastFullOptimizingTime > config.getFullTriggerInterval();
}

@Override
Expand Down Expand Up @@ -343,7 +351,7 @@ public boolean isMinorNecessary() {

protected boolean reachMinorInterval() {
return config.getMinorLeastInterval() >= 0
&& planTime - tableRuntime.getLastMinorOptimizingTime() > config.getMinorLeastInterval();
&& planTime - lastMinorOptimizingTime > config.getMinorLeastInterval();
}

protected boolean reachFullInterval() {
Expand All @@ -363,9 +371,7 @@ public boolean isFullNecessary() {

protected String name() {
if (name == null) {
name =
String.format(
"partition %s of %s", partition, tableRuntime.getTableIdentifier().toString());
name = String.format("partition %s of %s", partition, identifier.toString());
}
return name;
}
Expand Down Expand Up @@ -509,9 +515,8 @@ public String toString() {
.add("fragmentSize", fragmentSize)
.add("undersizedSegmentSize", minTargetSize)
.add("planTime", planTime)
.add("lastMinorOptimizeTime", tableRuntime.getLastMinorOptimizingTime())
.add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime())
.add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime())
.add("lastMinorOptimizeTime", lastMinorOptimizingTime)
.add("lastFullOptimizeTime", lastFullOptimizingTime)
.add("fragmentFileCount", fragmentFileCount)
.add("fragmentFileSize", fragmentFileSize)
.add("fragmentFileRecords", fragmentFileRecords)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.amoro.server.optimizing.plan;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.util.Pair;
Expand All @@ -31,11 +32,21 @@
public class IcebergPartitionPlan extends AbstractPartitionPlan {

protected IcebergPartitionPlan(
TableRuntime tableRuntime,
ServerTableIdentifier identifier,
OptimizingConfig config,
MixedTable table,
Pair<Integer, StructLike> partition,
long planTime) {
super(tableRuntime, table, partition, planTime);
long planTime,
long lastMinorOptimizingTime,
long lastFullOptimizingTime) {
super(
identifier,
table,
config,
partition,
planTime,
lastMinorOptimizingTime,
lastFullOptimizingTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.amoro.server.optimizing.plan;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.PrimaryKeyedFile;
import org.apache.amoro.hive.utils.HiveTableUtil;
import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.properties.HiveTableProperties;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
import org.apache.iceberg.ContentFile;
Expand All @@ -40,12 +41,22 @@ public class MixedHivePartitionPlan extends MixedIcebergPartitionPlan {
private String customHiveSubdirectory;

public MixedHivePartitionPlan(
TableRuntime tableRuntime,
ServerTableIdentifier identifier,
MixedTable table,
OptimizingConfig config,
Pair<Integer, StructLike> partition,
String hiveLocation,
long planTime) {
super(tableRuntime, table, partition, planTime);
long planTime,
long lastMinorOptimizingTime,
long lastFullOptimizingTime) {
super(
identifier,
table,
config,
partition,
planTime,
lastMinorOptimizingTime,
lastFullOptimizingTime);
this.hiveLocation = hiveLocation;
}

Expand Down Expand Up @@ -89,7 +100,15 @@ protected MixedHivePartitionEvaluator evaluator() {
@Override
protected CommonPartitionEvaluator buildEvaluator() {
return new MixedHivePartitionEvaluator(
tableRuntime, partition, partitionProperties, hiveLocation, planTime, isKeyedTable());
identifier,
config,
partition,
partitionProperties,
hiveLocation,
planTime,
isKeyedTable(),
lastMinorOptimizingTime,
lastFullOptimizingTime);
}

@Override
Expand Down Expand Up @@ -121,13 +140,24 @@ protected static class MixedHivePartitionEvaluator extends MixedIcebergPartition
private boolean filesNotInHiveLocation = false;

public MixedHivePartitionEvaluator(
TableRuntime tableRuntime,
ServerTableIdentifier identifier,
OptimizingConfig config,
Pair<Integer, StructLike> partition,
Map<String, String> partitionProperties,
String hiveLocation,
long planTime,
boolean keyedTable) {
super(tableRuntime, partition, partitionProperties, planTime, keyedTable);
boolean keyedTable,
long lastMinorOptimizingTime,
long lastFullOptimizingTime) {
super(
identifier,
config,
partition,
partitionProperties,
planTime,
keyedTable,
lastMinorOptimizingTime,
lastFullOptimizingTime);
this.hiveLocation = hiveLocation;
String optimizedTime =
partitionProperties.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_TRANSIENT_TIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.amoro.server.optimizing.plan;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.DataTreeNode;
import org.apache.amoro.data.PrimaryKeyedFile;
import org.apache.amoro.hive.optimizing.MixFormatRewriteExecutorFactory;
import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
Expand All @@ -49,11 +50,21 @@ public class MixedIcebergPartitionPlan extends AbstractPartitionPlan {
protected final Map<String, String> partitionProperties;

public MixedIcebergPartitionPlan(
TableRuntime tableRuntime,
ServerTableIdentifier identifier,
MixedTable table,
OptimizingConfig config,
Pair<Integer, StructLike> partition,
long planTime) {
super(tableRuntime, table, partition, planTime);
long planTime,
long lastMinorOptimizingTime,
long lastFullOptimizingTime) {
super(
identifier,
table,
config,
partition,
planTime,
lastMinorOptimizingTime,
lastFullOptimizingTime);
this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, partition.second());
}

Expand Down Expand Up @@ -101,7 +112,14 @@ protected TaskSplitter buildTaskSplitter() {
@Override
protected CommonPartitionEvaluator buildEvaluator() {
return new MixedIcebergPartitionEvaluator(
tableRuntime, partition, partitionProperties, planTime, isKeyedTable());
identifier,
config,
partition,
partitionProperties,
planTime,
isKeyedTable(),
lastMinorOptimizingTime,
lastFullOptimizingTime);
}

protected static class MixedIcebergPartitionEvaluator extends CommonPartitionEvaluator {
Expand All @@ -110,12 +128,16 @@ protected static class MixedIcebergPartitionEvaluator extends CommonPartitionEva
private final boolean reachBaseRefreshInterval;

public MixedIcebergPartitionEvaluator(
TableRuntime tableRuntime,
ServerTableIdentifier identifier,
OptimizingConfig config,
Pair<Integer, StructLike> partition,
Map<String, String> partitionProperties,
long planTime,
boolean keyedTable) {
super(tableRuntime, partition, planTime);
boolean keyedTable,
long lastMinorOptimizingTime,
long lastFullOptimizingTime) {
super(
identifier, config, partition, planTime, lastMinorOptimizingTime, lastFullOptimizingTime);
this.keyedTable = keyedTable;
String optimizedTime = partitionProperties.get(TableProperties.PARTITION_BASE_OPTIMIZED_TIME);
long lastBaseOptimizedTime = optimizedTime == null ? 0 : Long.parseLong(optimizedTime);
Expand Down
Loading

0 comments on commit 2e4fa1d

Please sign in to comment.