From 899910d817d3eff40706a1d37f2e6d506bf78543 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 27 Nov 2023 15:25:09 +0530 Subject: [PATCH] Core: Add a util to read write partition stats --- .../org/apache/iceberg/PartitionEntry.java | 361 ++++++++++++++++++ .../java/org/apache/iceberg/Partitioning.java | 5 +- .../apache/iceberg/TestPartitionEntry.java | 76 ++++ .../iceberg/data/PartitionStatsUtil.java | 148 +++++++ .../iceberg/data/TestPartitionStatsUtil.java | 125 ++++++ 5 files changed, 714 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/apache/iceberg/PartitionEntry.java create mode 100644 core/src/test/java/org/apache/iceberg/TestPartitionEntry.java create mode 100644 data/src/main/java/org/apache/iceberg/data/PartitionStatsUtil.java create mode 100644 data/src/test/java/org/apache/iceberg/data/TestPartitionStatsUtil.java diff --git a/core/src/main/java/org/apache/iceberg/PartitionEntry.java b/core/src/main/java/org/apache/iceberg/PartitionEntry.java new file mode 100644 index 000000000000..ac6da58c5d5a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionEntry.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.Objects; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Types; + +public class PartitionEntry implements IndexedRecord { + private PartitionData partitionData; + private int specId; + private long dataRecordCount; + private int dataFileCount; + private long dataFileSizeInBytes; + private long posDeleteRecordCount; + private int posDeleteFileCount; + private long eqDeleteRecordCount; + private int eqDeleteFileCount; + // Optional accurate count of records in a partition after applying the delete files if any + private long totalRecordCount; + // Commit time of snapshot that last updated this partition + private long lastUpdatedAt; + // ID of snapshot that last updated this partition + private long lastUpdatedSnapshotId; + + public enum Column { + PARTITION_DATA, + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + DATA_FILE_SIZE_IN_BYTES, + POSITION_DELETE_RECORD_COUNT, + POSITION_DELETE_FILE_COUNT, + EQUALITY_DELETE_RECORD_COUNT, + EQUALITY_DELETE_FILE_COUNT, + TOTAL_RECORD_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID + } + + private PartitionEntry() {} + + public static Builder builder() { + return new Builder(); + } + + public PartitionData partitionData() { + return partitionData; + } + + public int specId() { + return specId; + } + + public long dataRecordCount() { + return dataRecordCount; + } + + public int dataFileCount() { + return dataFileCount; + } + + public long dataFileSizeInBytes() { + return dataFileSizeInBytes; + } + + public long posDeleteRecordCount() { + return posDeleteRecordCount; + } + + public int posDeleteFileCount() { + return posDeleteFileCount; + } + + public long eqDeleteRecordCount() { + return eqDeleteRecordCount; + } + + public int eqDeleteFileCount() { + return eqDeleteFileCount; + } + + public long totalRecordCount() { + return totalRecordCount; + } + + public long lastUpdatedAt() { + return lastUpdatedAt; + } + + public long lastUpdatedSnapshotId() { + return lastUpdatedSnapshotId; + } + + @Override + public void put(int i, Object v) { + switch (i) { + case 0: + this.partitionData = (PartitionData) v; + return; + case 1: + this.specId = (int) v; + return; + case 2: + this.dataRecordCount = (long) v; + return; + case 3: + this.dataFileCount = (int) v; + return; + case 4: + this.dataFileSizeInBytes = (long) v; + return; + case 5: + this.posDeleteRecordCount = (long) v; + return; + case 6: + this.posDeleteFileCount = (int) v; + return; + case 7: + this.eqDeleteRecordCount = (long) v; + return; + case 8: + this.eqDeleteFileCount = (int) v; + return; + case 9: + this.totalRecordCount = (long) v; + return; + case 10: + this.lastUpdatedAt = (long) v; + return; + case 11: + this.lastUpdatedSnapshotId = (long) v; + return; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public Object get(int i) { + switch (i) { + case 0: + return partitionData; + case 1: + return specId; + case 2: + return dataRecordCount; + case 3: + return dataFileCount; + case 4: + return dataFileSizeInBytes; + case 5: + return posDeleteRecordCount; + case 6: + return posDeleteFileCount; + case 7: + return eqDeleteRecordCount; + case 8: + return eqDeleteFileCount; + case 9: + return totalRecordCount; + case 10: + return lastUpdatedAt; + case 11: + return lastUpdatedSnapshotId; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public Schema getSchema() { + return prepareAvroSchema(partitionData.getPartitionType()); + } + + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof PartitionEntry)) { + return false; + } + + PartitionEntry that = (PartitionEntry) o; + return partitionData.equals(that.partitionData) + && specId == that.specId + && dataRecordCount == that.dataRecordCount + && dataFileCount == that.dataFileCount + && dataFileSizeInBytes == that.dataFileSizeInBytes + && posDeleteRecordCount == that.posDeleteRecordCount + && posDeleteFileCount == that.posDeleteFileCount + && eqDeleteRecordCount == that.eqDeleteRecordCount + && eqDeleteFileCount == that.eqDeleteFileCount + && totalRecordCount == that.totalRecordCount + && lastUpdatedAt == that.lastUpdatedAt + && lastUpdatedSnapshotId == that.lastUpdatedSnapshotId; + } + + @Override + public int hashCode() { + return Objects.hash( + partitionData, + specId, + dataRecordCount, + dataFileCount, + dataFileSizeInBytes, + posDeleteRecordCount, + posDeleteFileCount, + eqDeleteRecordCount, + eqDeleteFileCount, + totalRecordCount, + lastUpdatedAt, + lastUpdatedSnapshotId); + } + + public static org.apache.iceberg.Schema icebergSchema(Types.StructType partitionType) { + if (partitionType.fields().isEmpty()) { + throw new IllegalArgumentException("getting schema for an unpartitioned table"); + } + + return new org.apache.iceberg.Schema( + Types.NestedField.required(1, Column.PARTITION_DATA.name(), partitionType), + Types.NestedField.required(2, Column.SPEC_ID.name(), Types.IntegerType.get()), + Types.NestedField.required(3, Column.DATA_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.required(4, Column.DATA_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.required(5, Column.DATA_FILE_SIZE_IN_BYTES.name(), Types.LongType.get()), + Types.NestedField.optional( + 6, Column.POSITION_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional( + 7, Column.POSITION_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional( + 8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional( + 9, Column.EQUALITY_DELETE_FILE_COUNT.name(), Types.IntegerType.get()), + Types.NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), Types.LongType.get()), + Types.NestedField.optional(11, Column.LAST_UPDATED_AT.name(), Types.LongType.get()), + Types.NestedField.optional( + 12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), Types.LongType.get())); + } + + private static Schema prepareAvroSchema(Types.StructType partitionType) { + return AvroSchemaUtil.convert(icebergSchema(partitionType), "partitionEntry"); + } + + public static class Builder { + private PartitionData partitionData; + private int specId; + private long dataRecordCount; + private int dataFileCount; + private long dataFileSizeInBytes; + private long posDeleteRecordCount; + private int posDeleteFileCount; + private long eqDeleteRecordCount; + private int eqDeleteFileCount; + private long totalRecordCount; + private long lastUpdatedAt; + private long lastUpdatedSnapshotId; + + private Builder() {} + + public Builder withPartitionData(PartitionData newPartitionData) { + this.partitionData = newPartitionData; + return this; + } + + public Builder withSpecId(int newSpecId) { + this.specId = newSpecId; + return this; + } + + public Builder withDataRecordCount(long newDataRecordCount) { + this.dataRecordCount = newDataRecordCount; + return this; + } + + public Builder withDataFileCount(int newDataFileCount) { + this.dataFileCount = newDataFileCount; + return this; + } + + public Builder withDataFileSizeInBytes(long newDataFileSizeInBytes) { + this.dataFileSizeInBytes = newDataFileSizeInBytes; + return this; + } + + public Builder withPosDeleteRecordCount(Long newPosDeleteRecordCount) { + this.posDeleteRecordCount = newPosDeleteRecordCount; + return this; + } + + public Builder withPosDeleteFileCount(Integer newPosDeleteFileCount) { + this.posDeleteFileCount = newPosDeleteFileCount; + return this; + } + + public Builder withEqDeleteRecordCount(Long newEqDeleteRecordCount) { + this.eqDeleteRecordCount = newEqDeleteRecordCount; + return this; + } + + public Builder withEqDeleteFileCount(Integer newEqDeleteFileCount) { + this.eqDeleteFileCount = newEqDeleteFileCount; + return this; + } + + public Builder withTotalRecordCount(Long newTotalRecordCount) { + this.totalRecordCount = newTotalRecordCount; + return this; + } + + public Builder withLastUpdatedAt(Long newLastUpdatedAt) { + this.lastUpdatedAt = newLastUpdatedAt; + return this; + } + + public Builder withLastUpdatedSnapshotId(Long newLastUpdatedSnapshotId) { + this.lastUpdatedSnapshotId = newLastUpdatedSnapshotId; + return this; + } + + public PartitionEntry newInstance() { + return new PartitionEntry(); + } + + public PartitionEntry build() { + PartitionEntry partition = new PartitionEntry(); + partition.partitionData = partitionData; + partition.specId = specId; + partition.dataRecordCount = dataRecordCount; + partition.dataFileCount = dataFileCount; + partition.dataFileSizeInBytes = dataFileSizeInBytes; + partition.posDeleteRecordCount = posDeleteRecordCount; + partition.posDeleteFileCount = posDeleteFileCount; + partition.eqDeleteRecordCount = eqDeleteRecordCount; + partition.eqDeleteFileCount = eqDeleteFileCount; + partition.totalRecordCount = totalRecordCount; + partition.lastUpdatedAt = lastUpdatedAt; + partition.lastUpdatedSnapshotId = lastUpdatedSnapshotId; + return partition; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 7e4fcae333d8..872eb3bb09af 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -238,7 +238,10 @@ public static StructType groupingKeyType(Schema schema, Collection specs = table.specs().values(); + return partitionType(table.specs().values()); + } + + public static StructType partitionType(Collection specs) { return buildPartitionProjectionType("table partition", specs, allFieldIds(specs)); } diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionEntry.java b/core/src/test/java/org/apache/iceberg/TestPartitionEntry.java new file mode 100644 index 000000000000..cb9fe9c9149c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestPartitionEntry.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestPartitionEntry { + + @Test + public void testPartitionBuilder() { + Types.StructType partitionType = + Types.StructType.of( + Types.NestedField.required(1000, "field1", Types.StringType.get()), + Types.NestedField.required(1001, "field2", Types.IntegerType.get())); + PartitionData partitionData = new PartitionData(partitionType); + partitionData.set(0, "value1"); + partitionData.set(1, 42); + + PartitionEntry partition = + PartitionEntry.builder() + .withPartitionData(partitionData) + .withSpecId(123) + .withDataRecordCount(1000L) + .withDataFileCount(5) + .withDataFileSizeInBytes(1024L * 1024L) + .withPosDeleteRecordCount(50L) + .withPosDeleteFileCount(2) + .withEqDeleteRecordCount(20L) + .withEqDeleteFileCount(1) + .withTotalRecordCount(3000L) + .withLastUpdatedAt(1627900200L) + .withLastUpdatedSnapshotId(456789L) + .build(); + + // Verify the get method + Assertions.assertEquals(partitionData, partition.get(0)); + Assertions.assertEquals(123, partition.get(1)); + Assertions.assertEquals(1000L, partition.get(2)); + Assertions.assertEquals(5, partition.get(3)); + Assertions.assertEquals(1024L * 1024L, partition.get(4)); + Assertions.assertEquals(50L, partition.get(5)); + Assertions.assertEquals(2, partition.get(6)); + Assertions.assertEquals(20L, partition.get(7)); + Assertions.assertEquals(1, partition.get(8)); + Assertions.assertEquals(3000L, partition.get(9)); + Assertions.assertEquals(1627900200L, partition.get(10)); + Assertions.assertEquals(456789L, partition.get(11)); + + // Verify the put method + PartitionEntry newPartition = PartitionEntry.builder().newInstance(); + int size = partition.getSchema().getFields().size(); + for (int i = 0; i < size; i++) { + newPartition.put(i, partition.get(i)); + } + + Assertions.assertEquals(newPartition, partition); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/PartitionStatsUtil.java b/data/src/main/java/org/apache/iceberg/data/PartitionStatsUtil.java new file mode 100644 index 000000000000..83096088e442 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/PartitionStatsUtil.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionEntry; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetAvroValueReaders; +import org.apache.iceberg.parquet.ParquetAvroWriter; +import org.apache.iceberg.types.Types; + +public final class PartitionStatsUtil { + + private PartitionStatsUtil() {} + + private static final String PARQUET_SUFFIX = ".parquet"; + + public static OutputFile newPartitionStatsFile( + TableOperations ops, long snapshotId, FileFormat format) { + return ops.io() + .newOutputFile( + ops.metadataFileLocation( + format.addExtension(String.format("partition-stats-%d", snapshotId)))); + } + + public static void writePartitionStatsFile( + Iterator partitions, OutputFile outputFile, Collection specs) { + validateFormat(outputFile.location()); + writeAsParquetFile( + PartitionEntry.icebergSchema(Partitioning.partitionType(specs)), partitions, outputFile); + } + + private static void validateFormat(String filePath) { + if (!filePath.toLowerCase().endsWith(PARQUET_SUFFIX)) { + throw new UnsupportedOperationException("Unsupported format : " + filePath); + } + } + + public static CloseableIterable readPartitionStatsFile( + Schema schema, InputFile inputFile) { + validateFormat(inputFile.location()); + // schema of partition column during read could be different from + // what is used for writing due to partition evolution. + // While reading, ParquetAvroValueReaders fills the data as per latest schema. + CloseableIterable records = + Parquet.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> ParquetAvroValueReaders.buildReader(schema, fileSchema)) + .build(); + + return CloseableIterable.transform(records, record -> toPartition(schema, record)); + } + + private static PartitionEntry toPartition(Schema schema, GenericData.Record record) { + PartitionEntry partition = PartitionEntry.builder().newInstance(); + partition.put( + PartitionEntry.Column.PARTITION_DATA.ordinal(), + extractPartitionDataFromRecord(schema, record)); + + int recordCount = record.getSchema().getFields().size(); + for (int columnIndex = 1; columnIndex < recordCount; columnIndex++) { + partition.put(columnIndex, record.get(columnIndex)); + } + + return partition; + } + + private static PartitionData extractPartitionDataFromRecord( + Schema schema, GenericData.Record record) { + int partitionDataCount = + record + .getSchema() + .getField(PartitionEntry.Column.PARTITION_DATA.name()) + .schema() + .getFields() + .size(); + PartitionData partitionData = + new PartitionData( + (Types.StructType) + schema.findField(PartitionEntry.Column.PARTITION_DATA.name()).type()); + for (int partitionColIndex = 0; partitionColIndex < partitionDataCount; partitionColIndex++) { + partitionData.set( + partitionColIndex, + ((GenericData.Record) record.get(PartitionEntry.Column.PARTITION_DATA.ordinal())) + .get(partitionColIndex)); + } + + return partitionData; + } + + private static void writeAsParquetFile( + Schema schema, Iterator records, OutputFile outputFile) { + try (DataWriter dataWriter = + Parquet.writeData(outputFile) + .schema(schema) + .createWriterFunc(ParquetAvroWriter::buildWriter) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .withSortOrder(sortOrder(schema)) + .build()) { + records.forEachRemaining(dataWriter::write); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static SortOrder sortOrder(Schema schema) { + SortOrder.Builder builder = SortOrder.builderFor(schema); + List partitionFields = + ((Types.StructType) schema.asStruct().fields().get(0).type()).fields(); + partitionFields.forEach( + field -> builder.asc(PartitionEntry.Column.PARTITION_DATA.name() + "." + field.name())); + + return builder.build(); + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsUtil.java b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsUtil.java new file mode 100644 index 000000000000..f7eafaaa3017 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestPartitionStatsUtil.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.nio.file.Paths; +import java.util.List; +import java.util.Random; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionEntry; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestPartitionStatsUtil { + private static final Logger LOG = LoggerFactory.getLogger(TestPartitionStatsUtil.class); + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "binary", Types.BinaryType.get())); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testPartitionStats() throws Exception { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").identity("binary").build(); + Table testTable = + TestTables.create( + temp.newFolder("test_partition_stats"), + "test_partition_stats", + SCHEMA, + spec, + SortOrder.unsorted(), + 2); + + Schema schema = + PartitionEntry.icebergSchema(Partitioning.partitionType(testTable.specs().values())); + + ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); + + long seed = System.currentTimeMillis(); + LOG.info("Seed used for random generator is {}", seed); + Random random = new Random(seed); + + for (int i = 0; i < 42; i++) { + PartitionData partitionData = + new PartitionData( + schema.findField(PartitionEntry.Column.PARTITION_DATA.name()).type().asStructType()); + partitionData.set(0, random.nextLong()); + + PartitionEntry partition = + PartitionEntry.builder() + .withPartitionData(partitionData) + .withSpecId(random.nextInt(10)) + .withDataRecordCount(random.nextLong()) + .withDataFileCount(random.nextInt()) + .withDataFileSizeInBytes(1024L * random.nextInt(20)) + .withPosDeleteRecordCount(random.nextLong()) + .withPosDeleteFileCount(random.nextInt()) + .withEqDeleteRecordCount(random.nextLong()) + .withEqDeleteFileCount(random.nextInt()) + .withTotalRecordCount(random.nextLong()) + .withLastUpdatedAt(random.nextLong()) + .withLastUpdatedSnapshotId(random.nextLong()) + .build(); + + partitionListBuilder.add(partition); + } + List records = partitionListBuilder.build(); + + OutputFile outputFile = + PartitionStatsUtil.newPartitionStatsFile( + ((HasTableOperations) testTable).operations(), 42L, FileFormat.PARQUET); + PartitionStatsUtil.writePartitionStatsFile( + records.iterator(), outputFile, testTable.specs().values()); + + Assertions.assertThat(Paths.get(outputFile.location())).exists(); + + List rows; + try (CloseableIterable recordIterator = + PartitionStatsUtil.readPartitionStatsFile( + schema, Files.localInput(outputFile.location()))) { + rows = Lists.newArrayList(recordIterator); + } + + Assertions.assertThat(rows).hasSize(records.size()); + for (int i = 0; i < records.size(); i++) { + Assertions.assertThat(rows.get(i)).isEqualTo(records.get(i)); + } + } +}