diff --git a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java index 8b1c450d6cf7..bef6565fe708 100644 --- a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java @@ -41,14 +41,17 @@ public class CatalogContext { private final Options options; private final Configuration hadoopConf; + @Nullable private final FileIOLoader preferIOLoader; @Nullable private final FileIOLoader fallbackIOLoader; private CatalogContext( Options options, @Nullable Configuration hadoopConf, + @Nullable FileIOLoader preferIOLoader, @Nullable FileIOLoader fallbackIOLoader) { this.options = checkNotNull(options); this.hadoopConf = hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf; + this.preferIOLoader = preferIOLoader; this.fallbackIOLoader = fallbackIOLoader; } @@ -59,20 +62,28 @@ public static CatalogContext create(Path warehouse) { } public static CatalogContext create(Options options) { - return new CatalogContext(options, null, null); + return new CatalogContext(options, null, null, null); } public static CatalogContext create(Options options, Configuration hadoopConf) { - return new CatalogContext(options, hadoopConf, null); + return new CatalogContext(options, hadoopConf, null, null); } public static CatalogContext create(Options options, FileIOLoader fallbackIOLoader) { - return new CatalogContext(options, null, fallbackIOLoader); + return new CatalogContext(options, null, null, fallbackIOLoader); } public static CatalogContext create( - Options options, Configuration hadoopConf, FileIOLoader fallbackIOLoader) { - return new CatalogContext(options, hadoopConf, fallbackIOLoader); + Options options, FileIOLoader preferIOLoader, FileIOLoader fallbackIOLoader) { + return new CatalogContext(options, null, preferIOLoader, fallbackIOLoader); + } + + public static CatalogContext create( + Options options, + Configuration hadoopConf, + FileIOLoader preferIOLoader, + FileIOLoader fallbackIOLoader) { + return new CatalogContext(options, hadoopConf, preferIOLoader, fallbackIOLoader); } public Options options() { @@ -84,6 +95,11 @@ public Configuration hadoopConf() { return hadoopConf; } + @Nullable + public FileIOLoader preferIO() { + return preferIOLoader; + } + @Nullable public FileIOLoader fallbackIO() { return fallbackIOLoader; diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 352d5726a7dc..84c1040ea24d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -316,14 +316,25 @@ static FileIO get(Path path, CatalogContext config) throws IOException { + "')"); } - Map loaders = discoverLoaders(); - FileIOLoader loader = loaders.get(uri.getScheme()); + FileIOLoader loader = null; + List ioExceptionList = new ArrayList<>(); + + // load preferIO + FileIOLoader perferIOLoader = config.preferIO(); + try { + loader = checkAccess(perferIOLoader, path, config); + } catch (IOException ioException) { + ioExceptionList.add(ioException); + } + + if (loader == null) { + Map loaders = discoverLoaders(); + loader = loaders.get(uri.getScheme()); + } // load fallbackIO FileIOLoader fallbackIO = config.fallbackIO(); - List ioExceptionList = new ArrayList<>(); - if (loader != null) { Set options = config.options().keySet().stream() @@ -374,6 +385,13 @@ static FileIO get(Path path, CatalogContext config) throws IOException { if (loader == null) { String fallbackMsg = ""; + String preferMsg = ""; + if (perferIOLoader != null) { + preferMsg = + " " + + perferIOLoader.getClass().getSimpleName() + + " also cannot access this path."; + } if (fallbackIO != null) { fallbackMsg = " " @@ -384,8 +402,8 @@ static FileIO get(Path path, CatalogContext config) throws IOException { new UnsupportedSchemeException( String.format( "Could not find a file io implementation for scheme '%s' in the classpath." - + "%s Hadoop FileSystem also cannot access this path '%s'.", - uri.getScheme(), fallbackMsg, path)); + + "%s %s Hadoop FileSystem also cannot access this path '%s'.", + uri.getScheme(), preferMsg, fallbackMsg, path)); for (IOException ioException : ioExceptionList) { ex.addSuppressed(ioException); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java index b08da568ebbe..b123802dbac0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java @@ -25,8 +25,6 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.List; @@ -93,24 +91,7 @@ private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) } SnapshotReader.Plan newPlan = - new SnapshotReader.Plan() { - @Nullable - @Override - public Long watermark() { - return plan.watermark(); - } - - @Nullable - @Override - public Long snapshotId() { - return plan.snapshotId(); - } - - @Override - public List splits() { - return limitedSplits; - } - }; + new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits); return new ScannedResult(newPlan); } else { return result; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java new file mode 100644 index 000000000000..84c9ece9e181 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/PlanImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.table.source.snapshot.SnapshotReader; + +import org.jetbrains.annotations.Nullable; + +import java.util.List; + +/** An implementation of {@link SnapshotReader.Plan}. */ +public class PlanImpl implements SnapshotReader.Plan { + + private final Long watermark; + private final Long snapshotId; + private final List splits; + + public PlanImpl(Long watermark, Long snapshotId, List splits) { + this.watermark = watermark; + this.snapshotId = snapshotId; + this.splits = splits; + } + + @Nullable + @Override + public Long watermark() { + return watermark; + } + + @Nullable + @Override + public Long snapshotId() { + return snapshotId; + } + + @Override + public List splits() { + return splits; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java index 17df85832de0..49ab3a87e764 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java @@ -23,8 +23,8 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.table.source.Split; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; @@ -78,23 +78,7 @@ public Result scan(SnapshotReader reader) { } } - return StartingScanner.fromPlan( - new SnapshotReader.Plan() { - @Override - public Long watermark() { - return null; - } - - @Override - public Long snapshotId() { - return endingSnapshotId; - } - - @Override - public List splits() { - return (List) result; - } - }); + return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId, (List) result)); } private List readSplits(SnapshotReader reader, Snapshot s) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 241f42395628..9c8126ffe3a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -38,9 +38,9 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; @@ -252,24 +252,7 @@ public Plan read() { scanMode != ScanMode.ALL, splitGenerator, files); - return new Plan() { - @Nullable - @Override - public Long watermark() { - return plan.watermark(); - } - - @Nullable - @Override - public Long snapshotId() { - return plan.snapshotId(); - } - - @Override - public List splits() { - return (List) splits; - } - }; + return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) splits); } private List generateSplits( @@ -401,24 +384,7 @@ private Plan toChangesPlan( } } - return new Plan() { - @Nullable - @Override - public Long watermark() { - return plan.watermark(); - } - - @Nullable - @Override - public Long snapshotId() { - return plan.snapshotId(); - } - - @Override - public List splits() { - return (List) splits; - } - }; + return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) splits); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java index 940ea3b00c43..29bec8502d46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java @@ -120,7 +120,11 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new AggregationSplit(fileIO, location)); + return () -> + Collections.singletonList( + new AggregationSplit( + new SchemaManager(fileIO, location).listAllIds().size(), + location)); } } @@ -129,17 +133,17 @@ private static class AggregationSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private AggregationSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private AggregationSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return new SchemaManager(fileIO, location).listAllIds().size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java index 5bb47e6e0d18..a7b1236bfd48 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java @@ -121,7 +121,13 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new AllTableSplit(fileIO, allTablePaths)); + return () -> + Collections.singletonList( + new AllTableSplit( + options(fileIO, allTablePaths).values().stream() + .flatMap(t -> t.values().stream()) + .reduce(0, (a, b) -> a + b.size(), Integer::sum), + allTablePaths)); } } @@ -129,19 +135,17 @@ private static class AllTableSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Map> allTablePaths; - private AllTableSplit(FileIO fileIO, Map> allTablePaths) { - this.fileIO = fileIO; + private AllTableSplit(long rowCount, Map> allTablePaths) { + this.rowCount = rowCount; this.allTablePaths = allTablePaths; } @Override public long rowCount() { - return options(fileIO, allTablePaths).values().stream() - .flatMap(t -> t.values().stream()) - .reduce(0, (a, b) -> a + b.size(), Integer::sum); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index da568aaff06a..7ae31095cf99 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -122,25 +122,26 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new BranchesSplit(fileIO, location)); + FileStoreTable table = FileStoreTableFactory.create(fileIO, location); + long rowCount = table.branchManager().branchCount(); + return () -> Collections.singletonList(new BranchesSplit(rowCount, location)); } } private static class BranchesSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private BranchesSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private BranchesSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - FileStoreTable table = FileStoreTableFactory.create(fileIO, location); - return table.branchManager().branchCount(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java index 896acfccce1c..a3ec3017eb5b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java @@ -115,7 +115,10 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { return () -> - Collections.singletonList(new ConsumersTable.ConsumersSplit(fileIO, location)); + Collections.singletonList( + new ConsumersTable.ConsumersSplit( + new ConsumerManager(fileIO, location).listAllIds().size(), + location)); } } @@ -124,17 +127,17 @@ private static class ConsumersSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private ConsumersSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private ConsumersSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return new ConsumerManager(fileIO, location).listAllIds().size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index a10a8e771ca1..0501dab08527 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -139,7 +139,8 @@ public InnerTableScan newScan() { @Override public InnerTableRead newRead() { - return new FilesRead(new SchemaManager(storeTable.fileIO(), storeTable.location())); + return new FilesRead( + new SchemaManager(storeTable.fileIO(), storeTable.location()), storeTable); } @Override @@ -185,47 +186,12 @@ public InnerTableScan withFilter(Predicate pushdown) { @Override public Plan innerPlan() { - return () -> - Collections.singletonList( - new FilesSplit( - storeTable, - partitionPredicate, - bucketPredicate, - levelPredicate)); + // plan here, just set the result of plan to split + TableScan.Plan plan = tablePlan(); + return () -> Collections.singletonList(new FilesSplit(plan.splits())); } - } - - private static class FilesSplit implements Split { - - private static final long serialVersionUID = 1L; - private final FileStoreTable storeTable; - - @Nullable private final LeafPredicate partitionPredicate; - @Nullable private final LeafPredicate bucketPredicate; - @Nullable private final LeafPredicate levelPredicate; - - private FilesSplit( - FileStoreTable storeTable, - @Nullable LeafPredicate partitionPredicate, - @Nullable LeafPredicate bucketPredicate, - @Nullable LeafPredicate levelPredicate) { - this.storeTable = storeTable; - this.partitionPredicate = partitionPredicate; - this.bucketPredicate = bucketPredicate; - this.levelPredicate = levelPredicate; - } - - @Override - public long rowCount() { - TableScan.Plan plan = plan(); - return plan.splits().stream() - .map(s -> (DataSplit) s) - .mapToLong(s -> s.dataFiles().size()) - .sum(); - } - - private TableScan.Plan plan() { + private TableScan.Plan tablePlan() { InnerTableScan scan = storeTable.newScan(); if (partitionPredicate != null) { if (partitionPredicate.function() instanceof Equal) { @@ -263,6 +229,29 @@ private TableScan.Plan plan() { } return scan.plan(); } + } + + private static class FilesSplit implements Split { + + private static final long serialVersionUID = 1L; + + private final List splits; + + private FilesSplit(List splits) { + this.splits = splits; + } + + @Override + public long rowCount() { + return splits.stream() + .map(s -> (DataSplit) s) + .mapToLong(s -> s.dataFiles().size()) + .sum(); + } + + public List splits() { + return splits; + } @Override public boolean equals(Object o) { @@ -273,12 +262,12 @@ public boolean equals(Object o) { return false; } FilesSplit that = (FilesSplit) o; - return Objects.equals(storeTable, that.storeTable); + return Objects.equals(splits, that.splits); } @Override public int hashCode() { - return Objects.hash(storeTable); + return Objects.hash(splits); } } @@ -286,10 +275,13 @@ private static class FilesRead implements InnerTableRead { private final SchemaManager schemaManager; + private final FileStoreTable storeTable; + private int[][] projection; - private FilesRead(SchemaManager schemaManager) { + private FilesRead(SchemaManager schemaManager, FileStoreTable fileStoreTable) { this.schemaManager = schemaManager; + this.storeTable = fileStoreTable; } @Override @@ -315,9 +307,7 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } FilesSplit filesSplit = (FilesSplit) split; - FileStoreTable table = filesSplit.storeTable; - TableScan.Plan plan = filesSplit.plan(); - if (plan.splits().isEmpty()) { + if (filesSplit.splits().isEmpty()) { return new IteratorRecordReader<>(Collections.emptyIterator()); } @@ -326,10 +316,10 @@ public RecordReader createReader(Split split) throws IOException { // schema id directly FieldStatsConverters fieldStatsConverters = new FieldStatsConverters( - sid -> schemaManager.schema(sid).fields(), table.schema().id()); + sid -> schemaManager.schema(sid).fields(), storeTable.schema().id()); RowDataToObjectArrayConverter partitionConverter = - new RowDataToObjectArrayConverter(table.schema().logicalPartitionType()); + new RowDataToObjectArrayConverter(storeTable.schema().logicalPartitionType()); Function keyConverters = new Function() { @@ -352,7 +342,7 @@ public RowDataToObjectArrayConverter apply(Long schemaId) { }); } }; - for (Split dataSplit : plan.splits()) { + for (Split dataSplit : filesSplit.splits()) { iteratorList.add( Iterators.transform( ((DataSplit) dataSplit).dataFiles().iterator(), @@ -362,7 +352,7 @@ public RowDataToObjectArrayConverter apply(Long schemaId) { partitionConverter, keyConverters, file, - table.getSchemaFieldStats(file), + storeTable.getSchemaFieldStats(file), fieldStatsConverters))); } Iterator rows = Iterators.concat(iteratorList.iterator()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java index 8f922cc156aa..d5e9f1647b56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java @@ -124,7 +124,10 @@ public InnerTableScan withFilter(Predicate predicate) { @Override protected Plan innerPlan() { - return () -> Collections.singletonList(new ManifestsSplit(fileIO, location, dataTable)); + return () -> + Collections.singletonList( + new ManifestsSplit( + allManifests(fileIO, location, dataTable).size(), location)); } } @@ -132,19 +135,17 @@ private static class ManifestsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private final Table dataTable; - private ManifestsSplit(FileIO fileIO, Path location, Table dataTable) { - this.fileIO = fileIO; + private ManifestsSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; - this.dataTable = dataTable; } @Override public long rowCount() { - return allManifests(fileIO, location, dataTable).size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java index 81dfa094c2fb..b740ddec1e2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java @@ -112,7 +112,9 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new OptionsSplit(fileIO, location)); + return () -> + Collections.singletonList( + new OptionsSplit(options(fileIO, location).size(), location)); } } @@ -120,17 +122,17 @@ private static class OptionsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private OptionsSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private OptionsSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return options(fileIO, location).size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index 877f9b7ef2ff..4f1899394b04 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -36,7 +36,6 @@ import org.apache.paimon.table.source.ReadOnceTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; -import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -110,7 +109,7 @@ public InnerTableScan newScan() { @Override public InnerTableRead newRead() { - return new PartitionsRead(); + return new PartitionsRead(storeTable); } @Override @@ -134,7 +133,9 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new PartitionsSplit(storeTable)); + return () -> + Collections.singletonList( + new PartitionsSplit(storeTable.newScan().plan().splits())); } } @@ -142,23 +143,22 @@ private static class PartitionsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileStoreTable storeTable; + private final List splits; - private PartitionsSplit(FileStoreTable storeTable) { - this.storeTable = storeTable; + private PartitionsSplit(List splits) { + this.splits = splits; } @Override public long rowCount() { - TableScan.Plan plan = plan(); - return plan.splits().stream() + return splits.stream() .map(s -> ((DataSplit) s).partition()) .collect(Collectors.toSet()) .size(); } - private TableScan.Plan plan() { - return storeTable.newScan().plan(); + private List splits() { + return splits; } @Override @@ -170,19 +170,25 @@ public boolean equals(Object o) { return false; } PartitionsSplit that = (PartitionsSplit) o; - return Objects.equals(storeTable, that.storeTable); + return Objects.equals(splits, that.splits); } @Override public int hashCode() { - return Objects.hash(storeTable); + return Objects.hash(splits); } } private static class PartitionsRead implements InnerTableRead { + private final FileStoreTable fileStoreTable; + private int[][] projection; + public PartitionsRead(FileStoreTable table) { + this.fileStoreTable = table; + } + @Override public InnerTableRead withFilter(Predicate predicate) { // TODO @@ -206,16 +212,15 @@ public RecordReader createReader(Split split) throws IOException { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } PartitionsSplit filesSplit = (PartitionsSplit) split; - FileStoreTable table = filesSplit.storeTable; - TableScan.Plan plan = filesSplit.plan(); - if (plan.splits().isEmpty()) { + if (filesSplit.splits().isEmpty()) { return new IteratorRecordReader<>(Collections.emptyIterator()); } List> iteratorList = new ArrayList<>(); RowDataToObjectArrayConverter partitionConverter = - new RowDataToObjectArrayConverter(table.schema().logicalPartitionType()); + new RowDataToObjectArrayConverter( + fileStoreTable.schema().logicalPartitionType()); - for (Split dataSplit : plan.splits()) { + for (Split dataSplit : filesSplit.splits()) { iteratorList.add( Iterators.transform( ((DataSplit) dataSplit).dataFiles().iterator(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java index b56b48539463..127323b42b50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java @@ -127,7 +127,11 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new SchemasSplit(fileIO, location)); + return () -> + Collections.singletonList( + new SchemasSplit( + new SchemaManager(fileIO, location).listAllIds().size(), + location)); } } @@ -136,17 +140,17 @@ private static class SchemasSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private SchemasSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private SchemasSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return new SchemaManager(fileIO, location).listAllIds().size(); + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index ee9cff1d1d7b..8d6b545d908e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -146,7 +146,13 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new SnapshotsSplit(fileIO, location)); + long rowCount; + try { + rowCount = new SnapshotManager(fileIO, location).snapshotCount(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return () -> Collections.singletonList(new SnapshotsSplit(rowCount, location)); } } @@ -154,21 +160,17 @@ private static class SnapshotsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private SnapshotsSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private SnapshotsSplit(long rowCount, Path location) { this.location = location; + this.rowCount = rowCount; } @Override public long rowCount() { - try { - return new SnapshotManager(fileIO, location).snapshotCount(); - } catch (IOException e) { - throw new RuntimeException(e); - } + return rowCount; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index 8027da2f6d67..ebd8aa7af0db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -129,24 +129,26 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new TagsSplit(fileIO, location)); + return () -> + Collections.singletonList( + new TagsSplit(new TagManager(fileIO, location).tagCount(), location)); } } private static class TagsSplit implements Split { private static final long serialVersionUID = 1L; - private final FileIO fileIO; + private final long rowCount; private final Path location; - private TagsSplit(FileIO fileIO, Path location) { - this.fileIO = fileIO; + private TagsSplit(long rowCount, Path location) { + this.rowCount = rowCount; this.location = location; } @Override public long rowCount() { - return new TagManager(fileIO, location).tagCount(); + return rowCount; } @Override