From db33a52b0f9fe66f259bbea84c2e03cee39f63cf Mon Sep 17 00:00:00 2001 From: baiyangtx Date: Mon, 4 Nov 2024 11:38:53 +0800 Subject: [PATCH 1/3] [AMORO-3316]: Decouple planer with table runtime (#3318) * Remove table-runtime from planner and evaluator * Fix Compile error * Fix UT Compile error * fix ut compile error * spotless * ut error --- .../server/optimizing/OptimizingQueue.java | 8 +- .../plan/AbstractPartitionPlan.java | 27 +++-- .../plan/CommonPartitionEvaluator.java | 33 +++--- .../optimizing/plan/IcebergPartitionPlan.java | 19 +++- .../plan/MixedHivePartitionPlan.java | 46 ++++++-- .../plan/MixedIcebergPartitionPlan.java | 38 +++++-- .../optimizing/plan/OptimizingEvaluator.java | 61 +++++++--- .../optimizing/plan/OptimizingPlanner.java | 105 +++++++++++++++--- .../executor/TableRuntimeRefreshExecutor.java | 2 +- .../flow/CompleteOptimizingFlow.java | 2 +- .../plan/TestHiveKeyedPartitionPlan.java | 7 +- .../plan/TestHiveUnkeyedPartitionPlan.java | 7 +- .../plan/TestIcebergPartitionPlan.java | 10 +- .../plan/TestKeyedPartitionPlan.java | 8 +- .../plan/TestOptimizingEvaluator.java | 12 +- .../plan/TestOptimizingPlanner.java | 2 +- .../plan/TestUnkeyedPartitionPlan.java | 8 +- 17 files changed, 306 insertions(+), 89 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 710ec729db..c024908a7f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -264,13 +264,13 @@ private TableOptimizingProcess planInternal(TableRuntime tableRuntime) { try { AmoroTable table = tableManager.loadTable(tableRuntime.getTableIdentifier()); OptimizingPlanner planner = - new OptimizingPlanner( + OptimizingPlanner.createOptimizingPlanner( tableRuntime.refresh(table), (MixedTable) table.originalTable(), getAvailableCore(), maxInputSizePerThread()); if (planner.isNecessary()) { - return new TableOptimizingProcess(planner); + return new TableOptimizingProcess(planner, tableRuntime); } else { tableRuntime.completeEmptyProcess(); return null; @@ -371,9 +371,9 @@ public TaskRuntime poll() { } } - public TableOptimizingProcess(OptimizingPlanner planner) { + public TableOptimizingProcess(OptimizingPlanner planner, TableRuntime tableRuntime) { processId = planner.getProcessId(); - tableRuntime = planner.getTableRuntime(); + this.tableRuntime = tableRuntime; optimizingType = planner.getOptimizingType(); planTime = planner.getPlanTime(); targetSnapshotId = planner.getTargetSnapshotId(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java index 262270281b..27cebc6d0b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/AbstractPartitionPlan.java @@ -18,12 +18,12 @@ package org.apache.amoro.server.optimizing.plan; +import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.optimizing.OptimizingInputProperties; import org.apache.amoro.optimizing.RewriteFilesInput; import org.apache.amoro.server.optimizing.OptimizingType; import org.apache.amoro.server.optimizing.RewriteStageTask; -import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; @@ -48,7 +48,9 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator { protected final Pair partition; protected final OptimizingConfig config; - protected final TableRuntime tableRuntime; + protected final ServerTableIdentifier identifier; + protected final long lastMinorOptimizingTime; + protected final long lastFullOptimizingTime; private CommonPartitionEvaluator evaluator; private TaskSplitter taskSplitter; protected MixedTable tableObject; @@ -74,15 +76,20 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator { protected final Set reservedDeleteFiles = Sets.newHashSet(); public AbstractPartitionPlan( - TableRuntime tableRuntime, + ServerTableIdentifier identifier, MixedTable table, + OptimizingConfig config, Pair partition, - long planTime) { + long planTime, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + this.identifier = identifier; this.partition = partition; this.tableObject = table; - this.config = tableRuntime.getOptimizingConfig(); - this.tableRuntime = tableRuntime; + this.config = config; this.planTime = planTime; + this.lastMinorOptimizingTime = lastMinorOptimizingTime; + this.lastFullOptimizingTime = lastFullOptimizingTime; } @Override @@ -98,7 +105,8 @@ protected CommonPartitionEvaluator evaluator() { } protected CommonPartitionEvaluator buildEvaluator() { - return new CommonPartitionEvaluator(tableRuntime, partition, planTime); + return new CommonPartitionEvaluator( + identifier, config, partition, planTime, lastMinorOptimizingTime, lastFullOptimizingTime); } @Override @@ -317,10 +325,7 @@ public RewriteStageTask buildTask(OptimizingInputProperties properties) { MixedTableUtil.getMixedTablePartitionSpecById(tableObject, partition.first()); String partitionPath = spec.partitionToPath(partition.second()); return new RewriteStageTask( - tableRuntime.getTableIdentifier().getId(), - partitionPath, - input, - properties.getProperties()); + identifier.getId(), partitionPath, input, properties.getProperties()); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java index 8038e97618..1c48718a77 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/CommonPartitionEvaluator.java @@ -18,9 +18,9 @@ package org.apache.amoro.server.optimizing.plan; +import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.server.optimizing.OptimizingType; -import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; @@ -40,10 +40,12 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { private static final Logger LOG = LoggerFactory.getLogger(CommonPartitionEvaluator.class); private final Set deleteFileSet = Sets.newHashSet(); - protected final TableRuntime tableRuntime; private final Pair partition; + protected final ServerTableIdentifier identifier; protected final OptimizingConfig config; + protected final long lastFullOptimizingTime; + protected final long lastMinorOptimizingTime; protected final long fragmentSize; protected final long minTargetSize; protected final long planTime; @@ -83,10 +85,15 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { private String name; public CommonPartitionEvaluator( - TableRuntime tableRuntime, Pair partition, long planTime) { + ServerTableIdentifier identifier, + OptimizingConfig config, + Pair partition, + long planTime, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + this.identifier = identifier; + this.config = config; this.partition = partition; - this.tableRuntime = tableRuntime; - this.config = tableRuntime.getOptimizingConfig(); this.fragmentSize = config.getTargetSize() / config.getFragmentRatio(); this.minTargetSize = (long) (config.getTargetSize() * config.getMinTargetSizeRatio()); if (minTargetSize > config.getTargetSize() - fragmentSize) { @@ -95,10 +102,11 @@ public CommonPartitionEvaluator( + "the another merge file."); } this.planTime = planTime; + this.lastMinorOptimizingTime = lastMinorOptimizingTime; + this.lastFullOptimizingTime = lastFullOptimizingTime; this.reachFullInterval = config.getFullTriggerInterval() >= 0 - && planTime - tableRuntime.getLastFullOptimizingTime() - > config.getFullTriggerInterval(); + && planTime - lastFullOptimizingTime > config.getFullTriggerInterval(); } @Override @@ -343,7 +351,7 @@ public boolean isMinorNecessary() { protected boolean reachMinorInterval() { return config.getMinorLeastInterval() >= 0 - && planTime - tableRuntime.getLastMinorOptimizingTime() > config.getMinorLeastInterval(); + && planTime - lastMinorOptimizingTime > config.getMinorLeastInterval(); } protected boolean reachFullInterval() { @@ -363,9 +371,7 @@ public boolean isFullNecessary() { protected String name() { if (name == null) { - name = - String.format( - "partition %s of %s", partition, tableRuntime.getTableIdentifier().toString()); + name = String.format("partition %s of %s", partition, identifier.toString()); } return name; } @@ -509,9 +515,8 @@ public String toString() { .add("fragmentSize", fragmentSize) .add("undersizedSegmentSize", minTargetSize) .add("planTime", planTime) - .add("lastMinorOptimizeTime", tableRuntime.getLastMinorOptimizingTime()) - .add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime()) - .add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime()) + .add("lastMinorOptimizeTime", lastMinorOptimizingTime) + .add("lastFullOptimizeTime", lastFullOptimizingTime) .add("fragmentFileCount", fragmentFileCount) .add("fragmentFileSize", fragmentFileSize) .add("fragmentFileRecords", fragmentFileRecords) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java index 590efa190b..239584f855 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/IcebergPartitionPlan.java @@ -18,9 +18,10 @@ package org.apache.amoro.server.optimizing.plan; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory; import org.apache.amoro.optimizing.OptimizingInputProperties; -import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.table.MixedTable; import org.apache.iceberg.StructLike; import org.apache.iceberg.util.Pair; @@ -31,11 +32,21 @@ public class IcebergPartitionPlan extends AbstractPartitionPlan { protected IcebergPartitionPlan( - TableRuntime tableRuntime, + ServerTableIdentifier identifier, + OptimizingConfig config, MixedTable table, Pair partition, - long planTime) { - super(tableRuntime, table, partition, planTime); + long planTime, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + super( + identifier, + table, + config, + partition, + planTime, + lastMinorOptimizingTime, + lastFullOptimizingTime); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java index dba9672e1c..a3b9af7bc8 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedHivePartitionPlan.java @@ -18,12 +18,13 @@ package org.apache.amoro.server.optimizing.plan; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.data.DataFileType; import org.apache.amoro.data.PrimaryKeyedFile; import org.apache.amoro.hive.utils.HiveTableUtil; import org.apache.amoro.optimizing.OptimizingInputProperties; import org.apache.amoro.properties.HiveTableProperties; -import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.table.MixedTable; import org.apache.iceberg.ContentFile; @@ -40,12 +41,22 @@ public class MixedHivePartitionPlan extends MixedIcebergPartitionPlan { private String customHiveSubdirectory; public MixedHivePartitionPlan( - TableRuntime tableRuntime, + ServerTableIdentifier identifier, MixedTable table, + OptimizingConfig config, Pair partition, String hiveLocation, - long planTime) { - super(tableRuntime, table, partition, planTime); + long planTime, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + super( + identifier, + table, + config, + partition, + planTime, + lastMinorOptimizingTime, + lastFullOptimizingTime); this.hiveLocation = hiveLocation; } @@ -89,7 +100,15 @@ protected MixedHivePartitionEvaluator evaluator() { @Override protected CommonPartitionEvaluator buildEvaluator() { return new MixedHivePartitionEvaluator( - tableRuntime, partition, partitionProperties, hiveLocation, planTime, isKeyedTable()); + identifier, + config, + partition, + partitionProperties, + hiveLocation, + planTime, + isKeyedTable(), + lastMinorOptimizingTime, + lastFullOptimizingTime); } @Override @@ -121,13 +140,24 @@ protected static class MixedHivePartitionEvaluator extends MixedIcebergPartition private boolean filesNotInHiveLocation = false; public MixedHivePartitionEvaluator( - TableRuntime tableRuntime, + ServerTableIdentifier identifier, + OptimizingConfig config, Pair partition, Map partitionProperties, String hiveLocation, long planTime, - boolean keyedTable) { - super(tableRuntime, partition, partitionProperties, planTime, keyedTable); + boolean keyedTable, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + super( + identifier, + config, + partition, + partitionProperties, + planTime, + keyedTable, + lastMinorOptimizingTime, + lastFullOptimizingTime); this.hiveLocation = hiveLocation; String optimizedTime = partitionProperties.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_TRANSIENT_TIME); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java index 9892b3d92f..282214fff1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -18,12 +18,13 @@ package org.apache.amoro.server.optimizing.plan; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.data.DataFileType; import org.apache.amoro.data.DataTreeNode; import org.apache.amoro.data.PrimaryKeyedFile; import org.apache.amoro.hive.optimizing.MixFormatRewriteExecutorFactory; import org.apache.amoro.optimizing.OptimizingInputProperties; -import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; @@ -49,11 +50,21 @@ public class MixedIcebergPartitionPlan extends AbstractPartitionPlan { protected final Map partitionProperties; public MixedIcebergPartitionPlan( - TableRuntime tableRuntime, + ServerTableIdentifier identifier, MixedTable table, + OptimizingConfig config, Pair partition, - long planTime) { - super(tableRuntime, table, partition, planTime); + long planTime, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + super( + identifier, + table, + config, + partition, + planTime, + lastMinorOptimizingTime, + lastFullOptimizingTime); this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, partition.second()); } @@ -101,7 +112,14 @@ protected TaskSplitter buildTaskSplitter() { @Override protected CommonPartitionEvaluator buildEvaluator() { return new MixedIcebergPartitionEvaluator( - tableRuntime, partition, partitionProperties, planTime, isKeyedTable()); + identifier, + config, + partition, + partitionProperties, + planTime, + isKeyedTable(), + lastMinorOptimizingTime, + lastFullOptimizingTime); } protected static class MixedIcebergPartitionEvaluator extends CommonPartitionEvaluator { @@ -110,12 +128,16 @@ protected static class MixedIcebergPartitionEvaluator extends CommonPartitionEva private final boolean reachBaseRefreshInterval; public MixedIcebergPartitionEvaluator( - TableRuntime tableRuntime, + ServerTableIdentifier identifier, + OptimizingConfig config, Pair partition, Map partitionProperties, long planTime, - boolean keyedTable) { - super(tableRuntime, partition, planTime); + boolean keyedTable, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + super( + identifier, config, partition, planTime, lastMinorOptimizingTime, lastFullOptimizingTime); this.keyedTable = keyedTable; String optimizedTime = partitionProperties.get(TableProperties.PARTITION_BASE_OPTIMIZED_TIME); long lastBaseOptimizedTime = optimizedTime == null ? 0 : Long.parseLong(optimizedTime); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java index f404592fcf..5914d57bb9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingEvaluator.java @@ -18,7 +18,9 @@ package org.apache.amoro.server.optimizing.plan; +import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; +import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.hive.table.SupportHive; import org.apache.amoro.hive.utils.TableTypeUtil; import org.apache.amoro.server.optimizing.scan.IcebergTableFileScanHelper; @@ -59,25 +61,44 @@ public class OptimizingEvaluator { private static final Logger LOG = LoggerFactory.getLogger(OptimizingEvaluator.class); + protected final ServerTableIdentifier identifier; + protected final OptimizingConfig config; protected final MixedTable mixedTable; - protected final TableRuntime tableRuntime; protected final TableSnapshot currentSnapshot; + protected final long lastFullOptimizingTime; + protected final long lastMinorOptimizingTime; protected final int maxPendingPartitions; protected boolean isInitialized = false; - protected Map needOptimizingPlanMap = Maps.newHashMap(); protected Map partitionPlanMap = Maps.newHashMap(); - public OptimizingEvaluator( + public static OptimizingEvaluator createOptimizingEvaluator( TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) { - this.tableRuntime = tableRuntime; - this.mixedTable = table; - this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime); - this.maxPendingPartitions = maxPendingPartitions; + return new OptimizingEvaluator( + tableRuntime.getTableIdentifier(), + tableRuntime.getOptimizingConfig(), + table, + IcebergTableUtil.getSnapshot(table, tableRuntime), + maxPendingPartitions, + tableRuntime.getLastMinorOptimizingTime(), + tableRuntime.getLastFullOptimizingTime()); } - public TableRuntime getTableRuntime() { - return tableRuntime; + public OptimizingEvaluator( + ServerTableIdentifier identifier, + OptimizingConfig config, + MixedTable table, + TableSnapshot currentSnapshot, + int maxPendingPartitions, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + this.identifier = identifier; + this.config = config; + this.mixedTable = table; + this.currentSnapshot = currentSnapshot; + this.maxPendingPartitions = maxPendingPartitions; + this.lastFullOptimizingTime = lastFullOptimizingTime; + this.lastMinorOptimizingTime = lastMinorOptimizingTime; } protected void initEvaluator() { @@ -150,25 +171,37 @@ private Map partitionProperties(Pair partit protected PartitionEvaluator buildEvaluator(Pair partition) { if (TableFormat.ICEBERG.equals(mixedTable.format())) { - return new CommonPartitionEvaluator(tableRuntime, partition, System.currentTimeMillis()); + return new CommonPartitionEvaluator( + identifier, + config, + partition, + System.currentTimeMillis(), + lastMinorOptimizingTime, + lastFullOptimizingTime); } else { Map partitionProperties = partitionProperties(partition); if (TableTypeUtil.isHive(mixedTable)) { String hiveLocation = (((SupportHive) mixedTable).hiveLocation()); return new MixedHivePartitionPlan.MixedHivePartitionEvaluator( - tableRuntime, + identifier, + config, partition, partitionProperties, hiveLocation, System.currentTimeMillis(), - mixedTable.isKeyedTable()); + mixedTable.isKeyedTable(), + lastMinorOptimizingTime, + lastFullOptimizingTime); } else { return new MixedIcebergPartitionPlan.MixedIcebergPartitionEvaluator( - tableRuntime, + identifier, + config, partition, partitionProperties, System.currentTimeMillis(), - mixedTable.isKeyedTable()); + mixedTable.isKeyedTable(), + lastMinorOptimizingTime, + lastFullOptimizingTime); } } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java index cf6fc9e713..13bb8f1d77 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/plan/OptimizingPlanner.java @@ -18,7 +18,9 @@ package org.apache.amoro.server.optimizing.plan; +import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; +import org.apache.amoro.config.OptimizingConfig; import org.apache.amoro.hive.table.SupportHive; import org.apache.amoro.hive.utils.TableTypeUtil; import org.apache.amoro.server.AmoroServiceConstants; @@ -26,6 +28,8 @@ import org.apache.amoro.server.optimizing.RewriteStageTask; import org.apache.amoro.server.table.KeyedTableSnapshot; import org.apache.amoro.server.table.TableRuntime; +import org.apache.amoro.server.table.TableSnapshot; +import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.MixedTable; import org.apache.amoro.utils.ExpressionUtil; @@ -49,24 +53,21 @@ public class OptimizingPlanner extends OptimizingEvaluator { private static final Logger LOG = LoggerFactory.getLogger(OptimizingPlanner.class); private final Expression partitionFilter; - protected long processId; private final double availableCore; private final long planTime; private OptimizingType optimizingType; private final PartitionPlannerFactory partitionPlannerFactory; private List tasks; - private List actualPartitionPlans; private final long maxInputSizePerThread; - public OptimizingPlanner( + public static OptimizingPlanner createOptimizingPlanner( TableRuntime tableRuntime, MixedTable table, double availableCore, long maxInputSizePerThread) { - super(tableRuntime, table, Integer.MAX_VALUE); - this.partitionFilter = + Expression partitionFilter = tableRuntime.getPendingInput() == null ? Expressions.alwaysTrue() : tableRuntime.getPendingInput().getPartitions().entrySet().stream() @@ -76,10 +77,52 @@ public OptimizingPlanner( table, entry.getKey(), entry.getValue())) .reduce(Expressions::or) .orElse(Expressions.alwaysTrue()); + long planTime = System.currentTimeMillis(); + + return new OptimizingPlanner( + tableRuntime.getTableIdentifier(), + tableRuntime.getOptimizingConfig(), + table, + IcebergTableUtil.getSnapshot(table, tableRuntime), + partitionFilter, + Math.max(tableRuntime.getNewestProcessId() + 1, planTime), + availableCore, + maxInputSizePerThread, + tableRuntime.getLastMinorOptimizingTime(), + tableRuntime.getLastFullOptimizingTime()); + } + + public OptimizingPlanner( + ServerTableIdentifier identifier, + OptimizingConfig config, + MixedTable table, + TableSnapshot snapshot, + Expression partitionFilter, + long processId, + double availableCore, + long maxInputSizePerThread, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + super( + identifier, + config, + table, + snapshot, + Integer.MAX_VALUE, + lastMinorOptimizingTime, + lastFullOptimizingTime); + this.partitionFilter = partitionFilter; this.availableCore = availableCore; this.planTime = System.currentTimeMillis(); - this.processId = Math.max(tableRuntime.getNewestProcessId() + 1, planTime); - this.partitionPlannerFactory = new PartitionPlannerFactory(mixedTable, tableRuntime, planTime); + this.processId = processId; + this.partitionPlannerFactory = + new PartitionPlannerFactory( + identifier, + config, + mixedTable, + planTime, + lastMinorOptimizingTime, + lastFullOptimizingTime); this.maxInputSizePerThread = maxInputSizePerThread; } @@ -150,7 +193,7 @@ public List planTasks() { initEvaluator(); } if (!super.isNecessary()) { - LOG.debug("Table {} skip planning", tableRuntime.getTableIdentifier()); + LOG.debug("Table {} skip planning", identifier); return cacheAndReturnTasks(Collections.emptyList()); } @@ -188,7 +231,7 @@ public List planTasks() { long endTime = System.nanoTime(); LOG.info( "{} finish plan, type = {}, get {} tasks, cost {} ns, {} ms maxInputSize {} actualInputSize {}", - tableRuntime.getTableIdentifier(), + identifier, getOptimizingType(), tasks.size(), endTime - startTime, @@ -216,16 +259,27 @@ public long getProcessId() { } private static class PartitionPlannerFactory { + protected final OptimizingConfig config; + protected final ServerTableIdentifier identifier; + protected final long lastMinorOptimizingTime; + protected final long lastFullOptimizingTime; private final MixedTable mixedTable; - private final TableRuntime tableRuntime; private final String hiveLocation; private final long planTime; public PartitionPlannerFactory( - MixedTable mixedTable, TableRuntime tableRuntime, long planTime) { + ServerTableIdentifier identifier, + OptimizingConfig config, + MixedTable mixedTable, + long planTime, + long lastMinorOptimizingTime, + long lastFullOptimizingTime) { + this.identifier = identifier; + this.config = config; this.mixedTable = mixedTable; - this.tableRuntime = tableRuntime; this.planTime = planTime; + this.lastFullOptimizingTime = lastFullOptimizingTime; + this.lastMinorOptimizingTime = lastMinorOptimizingTime; if (TableTypeUtil.isHive(mixedTable)) { this.hiveLocation = (((SupportHive) mixedTable).hiveLocation()); } else { @@ -235,13 +289,34 @@ public PartitionPlannerFactory( public PartitionEvaluator buildPartitionPlanner(Pair partition) { if (TableFormat.ICEBERG.equals(mixedTable.format())) { - return new IcebergPartitionPlan(tableRuntime, mixedTable, partition, planTime); + return new IcebergPartitionPlan( + identifier, + config, + mixedTable, + partition, + planTime, + lastMinorOptimizingTime, + lastFullOptimizingTime); } else { if (TableTypeUtil.isHive(mixedTable)) { return new MixedHivePartitionPlan( - tableRuntime, mixedTable, partition, hiveLocation, planTime); + identifier, + mixedTable, + config, + partition, + hiveLocation, + planTime, + lastMinorOptimizingTime, + lastFullOptimizingTime); } else { - return new MixedIcebergPartitionPlan(tableRuntime, mixedTable, partition, planTime); + return new MixedIcebergPartitionPlan( + identifier, + mixedTable, + config, + partition, + planTime, + lastMinorOptimizingTime, + lastFullOptimizingTime); } } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java index b290323ab3..36261a09b1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java @@ -53,7 +53,7 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable table) { if (tableRuntime.isOptimizingEnabled() && !tableRuntime.getOptimizingStatus().isProcessing()) { OptimizingEvaluator evaluator = - new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions); + OptimizingEvaluator.createOptimizingEvaluator(tableRuntime, table, maxPendingPartitions); if (evaluator.isNecessary()) { OptimizingEvaluator.PendingInput pendingInput = evaluator.getOptimizingPendingInput(); logger.debug( diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java index 41765bf5a4..0a8ed6474b 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/flow/CompleteOptimizingFlow.java @@ -192,7 +192,7 @@ private OptimizingPlanner planner() { Mockito.when(tableRuntime.getOptimizingConfig()).thenAnswer(f -> optimizingConfig()); Mockito.when(tableRuntime.getTableIdentifier()) .thenReturn(ServerTableIdentifier.of(1L, "a", "b", "c", table.format())); - return new OptimizingPlanner( + return OptimizingPlanner.createOptimizingPlanner( tableRuntime, table, availableCore, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java index 82ee12a7c0..b1cbfbd670 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveKeyedPartitionPlan.java @@ -81,11 +81,14 @@ protected AbstractPartitionPlan getPartitionPlan() { SupportHive hiveTable = (SupportHive) getMixedTable(); String hiveLocation = hiveTable.hiveLocation(); return new MixedHivePartitionPlan( - getTableRuntime(), + getTableRuntime().getTableIdentifier(), getMixedTable(), + getTableRuntime().getOptimizingConfig(), getPartition(), hiveLocation, - System.currentTimeMillis()); + System.currentTimeMillis(), + getTableRuntime().getLastMinorOptimizingTime(), + getTableRuntime().getLastFullOptimizingTime()); } @Test diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java index 6109486e70..23c2039f20 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestHiveUnkeyedPartitionPlan.java @@ -80,11 +80,14 @@ protected AbstractPartitionPlan getPartitionPlan() { SupportHive hiveTable = (SupportHive) getMixedTable(); String hiveLocation = hiveTable.hiveLocation(); return new MixedHivePartitionPlan( - getTableRuntime(), + getTableRuntime().getTableIdentifier(), getMixedTable(), + getTableRuntime().getOptimizingConfig(), getPartition(), hiveLocation, - System.currentTimeMillis()); + System.currentTimeMillis(), + getTableRuntime().getLastMinorOptimizingTime(), + getTableRuntime().getLastFullOptimizingTime()); } @Test diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java index 857935e2b8..98ae4a4fc3 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestIcebergPartitionPlan.java @@ -27,6 +27,7 @@ import org.apache.amoro.optimizing.OptimizingInputProperties; import org.apache.amoro.server.optimizing.scan.IcebergTableFileScanHelper; import org.apache.amoro.server.optimizing.scan.TableFileScanHelper; +import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.junit.Test; @@ -72,8 +73,15 @@ public void testOnlyOneFragmentFiles() { @Override protected AbstractPartitionPlan getPartitionPlan() { + TableRuntime tableRuntime = getTableRuntime(); return new IcebergPartitionPlan( - getTableRuntime(), getMixedTable(), getPartition(), System.currentTimeMillis()); + tableRuntime.getTableIdentifier(), + tableRuntime.getOptimizingConfig(), + getMixedTable(), + getPartition(), + System.currentTimeMillis(), + tableRuntime.getLastMinorOptimizingTime(), + tableRuntime.getLastFullOptimizingTime()); } @Override diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java index 6a169d7db5..3b7509fafb 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestKeyedPartitionPlan.java @@ -210,7 +210,13 @@ protected KeyedTable getMixedTable() { @Override protected AbstractPartitionPlan getPartitionPlan() { return new MixedIcebergPartitionPlan( - getTableRuntime(), getMixedTable(), getPartition(), System.currentTimeMillis()); + getTableRuntime().getTableIdentifier(), + getMixedTable(), + getTableRuntime().getOptimizingConfig(), + getPartition(), + System.currentTimeMillis(), + getTableRuntime().getLastMinorOptimizingTime(), + getTableRuntime().getLastFullOptimizingTime()); } @Override diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java index 395aa653db..920de8f64a 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java @@ -27,6 +27,8 @@ import org.apache.amoro.server.optimizing.scan.KeyedTableFileScanHelper; import org.apache.amoro.server.optimizing.scan.TableFileScanHelper; import org.apache.amoro.server.optimizing.scan.UnkeyedTableFileScanHelper; +import org.apache.amoro.server.table.TableSnapshot; +import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; @@ -110,7 +112,15 @@ public void testFragmentFiles() { } protected OptimizingEvaluator buildOptimizingEvaluator() { - return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100); + TableSnapshot snapshot = IcebergTableUtil.getSnapshot(getMixedTable(), tableRuntime); + return new OptimizingEvaluator( + tableRuntime.getTableIdentifier(), + tableRuntime.getOptimizingConfig(), + getMixedTable(), + snapshot, + 100, + tableRuntime.getLastMinorOptimizingTime(), + tableRuntime.getLastFullOptimizingTime()); } protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java index b49061608d..ae5545f6ed 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingPlanner.java @@ -96,7 +96,7 @@ public void testFragmentFiles() { @Override protected OptimizingPlanner buildOptimizingEvaluator() { - return new OptimizingPlanner( + return OptimizingPlanner.createOptimizingPlanner( getTableRuntime(), getMixedTable(), 1, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java index 6224bd24e8..c240cce267 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestUnkeyedPartitionPlan.java @@ -75,7 +75,13 @@ public void testOnlyOneFragmentFiles() { @Override protected AbstractPartitionPlan getPartitionPlan() { return new MixedIcebergPartitionPlan( - getTableRuntime(), getMixedTable(), getPartition(), System.currentTimeMillis()); + getTableRuntime().getTableIdentifier(), + getMixedTable(), + getTableRuntime().getOptimizingConfig(), + getPartition(), + System.currentTimeMillis(), + getTableRuntime().getLastMinorOptimizingTime(), + getTableRuntime().getLastFullOptimizingTime()); } @Override From 549f4b88ae709182a1f32cd3f64a4e27768a834a Mon Sep 17 00:00:00 2001 From: Wang Tao Date: Tue, 5 Nov 2024 11:15:53 +0800 Subject: [PATCH 2/3] [AMORO-3309][Improvement] Support for terminal integration with LDAP authentication in Kyuubi (#3309) * Support for terminal integration with Kyuubi using LDAP authentication * Support for terminal integration with Kyuubi using LDAP authentication * support helm configuration --- .../kyuubi/KyuubiTerminalSessionFactory.java | 11 +++++++++-- charts/amoro/templates/amoro-configmap.yaml | 3 +++ charts/amoro/values.yaml | 6 ++++++ docs/admin-guides/using-kyuubi.md | 12 ++++++++++++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/kyuubi/KyuubiTerminalSessionFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/kyuubi/KyuubiTerminalSessionFactory.java index 965f34982c..260758a850 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/kyuubi/KyuubiTerminalSessionFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/kyuubi/KyuubiTerminalSessionFactory.java @@ -48,6 +48,9 @@ public class KyuubiTerminalSessionFactory implements TerminalSessionFactory { public static ConfigOption KERBEROS_ENABLE = ConfigOptions.key("kerberos.enabled").booleanType().defaultValue(false); + public static ConfigOption LDAP_ENABLE = + ConfigOptions.key("ldap.enabled").booleanType().defaultValue(false); + public static ConfigOption KERBEROS_PROXY_ENABLE = ConfigOptions.key("kerberos.proxy.enabled") .booleanType() @@ -79,6 +82,7 @@ public class KyuubiTerminalSessionFactory implements TerminalSessionFactory { private String jdbcUrl; private boolean kyuubiKerberosEnable; private boolean proxyKerberosEnable; + private boolean ldapEnabled; private String username; private String password; @@ -98,6 +102,7 @@ public void initialize(Configurations properties) { this.proxyKerberosEnable = properties.getBoolean(KERBEROS_PROXY_ENABLE); this.username = properties.get(KYUUBI_USERNAME); this.password = properties.get(KYUUBI_PASSWORD); + this.ldapEnabled = properties.get(LDAP_ENABLE); try { this.params = Utils.extractURLComponents(jdbcUrl, new Properties()); } catch (SQLException e) { @@ -109,7 +114,7 @@ public void initialize(Configurations properties) { public TerminalSession create(TableMetaStore metaStore, Configurations configuration) { List logs = Lists.newArrayList(); JdbcConnectionParams connectionParams = new JdbcConnectionParams(this.params); - if (metaStore.isKerberosAuthMethod()) { + if (!this.ldapEnabled && metaStore.isKerberosAuthMethod()) { checkAndFillKerberosInfo(connectionParams, metaStore); } @@ -124,7 +129,9 @@ public TerminalSession create(TableMetaStore metaStore, Configurations configura sessionConf.put("jdbc.url", kyuubiJdbcUrl); Properties properties = new Properties(); - if (!metaStore.isKerberosAuthMethod() && Objects.nonNull(metaStore.getHadoopUsername())) { + if (!this.ldapEnabled + && !metaStore.isKerberosAuthMethod() + && Objects.nonNull(metaStore.getHadoopUsername())) { properties.put(JdbcConnectionParams.AUTH_USER, metaStore.getHadoopUsername()); sessionConf.put(JdbcConnectionParams.AUTH_USER, metaStore.getHadoopUsername()); } diff --git a/charts/amoro/templates/amoro-configmap.yaml b/charts/amoro/templates/amoro-configmap.yaml index 1ffb9710d1..24d3a6744b 100644 --- a/charts/amoro/templates/amoro-configmap.yaml +++ b/charts/amoro/templates/amoro-configmap.yaml @@ -99,6 +99,9 @@ data: {{- if eq .Values.amoroConf.terminal.backend "local" }} local.spark.sql.iceberg.handle-timestamp-without-timezone: {{ .Values.amoroConf.terminal.icebergHandleTimestampWithoutTimezone }} {{- end }} + {{- if hasKey .Values.amoroConf.terminal "kyuubiLdapEnabled" }} + kyuubi.ldap.enabled: {{ .Values.amoroConf.terminal.kyuubiLdapEnabled}} + {{- end }} {{- if eq .Values.amoroConf.terminal.backend "kyuubi" }} kyuubi.jdbc.url: {{ .Values.amoroConf.terminal.kyuubiJdbcUrl | quote }} {{- end }} diff --git a/charts/amoro/values.yaml b/charts/amoro/values.yaml index 9e7681d7f6..a7b937b8f4 100644 --- a/charts/amoro/values.yaml +++ b/charts/amoro/values.yaml @@ -177,6 +177,12 @@ amoroConf: ## backend: kyuubi ## kyuubiJdbcUrl: jdbc:hive2://127.0.0.1:10009/ + ## Kyuubi terminal backend configuration with ldap authentication. + ## terminal: + ## backend: kyuubi + ## kyuubiLdapEnabled: true + ## kyuubiJdbcUrl:jdbc:hive2://127.0.0.1:10009/default?user=test;password=test; + ## @param amoroDefaults The value (templated string) is used for conf.yaml file ## ref: https://github.com/apache/amoro/blob/master/dist/src/main/amoro-bin/conf/config.yaml ## diff --git a/docs/admin-guides/using-kyuubi.md b/docs/admin-guides/using-kyuubi.md index f5768210e0..44ccfde439 100644 --- a/docs/admin-guides/using-kyuubi.md +++ b/docs/admin-guides/using-kyuubi.md @@ -50,3 +50,15 @@ To execute SQL in Terminal, you can refer to the following steps:: - Click the Execute button to run the SQL; ![terminal](../images/admin/terminal_introduce.png) + +## LDAP Authentication +Except for the configuration of Kerberos authentication, everything else is the same. You can integrate with LDAP using the following configuration: +set kyuubi.ldap.enabled to true, and then specify the username and password for LDAP in the URL. +```shell +ams: + terminal: + backend: kyuubi + kyuubi.ldap.enabled: true + kyuubi.jdbc.url: jdbc:hive2://127.0.0.1:10009/default;user=test;password=test # kyuubi Connection Address +``` + From 0efbd59560d8916ce68a82364eccd94aacb92f67 Mon Sep 17 00:00:00 2001 From: ConradJam Date: Wed, 6 Nov 2024 16:38:40 +0800 Subject: [PATCH 3/3] [hotfix] fix vue ams get detail of SnapshotId url error (#3327) Co-authored-by: ConradJam --- amoro-web/src/services/table.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amoro-web/src/services/table.service.ts b/amoro-web/src/services/table.service.ts index 309a05d476..ded3d1f7fd 100644 --- a/amoro-web/src/services/table.service.ts +++ b/amoro-web/src/services/table.service.ts @@ -135,7 +135,7 @@ export function getDetailBySnapshotId( }, ) { const { catalog, db, table, snapshotId, page, pageSize, ref, token } = params - return request.get(`api/v1/tables/catalogs/${catalog}/dbs/${db}/tables/${table}/snapshots/${snapshotId}/detail`, { params: { page, pageSize, ref, token } }) + return request.get(`api/ams/v1/tables/catalogs/${catalog}/dbs/${db}/tables/${table}/snapshots/${snapshotId}/detail`, { params: { page, pageSize, ref, token } }) } // get operations export function getOperations(