Skip to content

Commit

Permalink
Colstat
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Oct 11, 2023
1 parent 90cf38c commit 5bb425d
Show file tree
Hide file tree
Showing 25 changed files with 427 additions and 56 deletions.
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public BatchScan includeColumnStats() {
return new BatchScanAdapter(scan.includeColumnStats());
}

@Override
public BatchScan includeColumnStats(Collection<Integer> columns) {
return new BatchScanAdapter(scan.includeColumnStats(columns));
}

@Override
public BatchScan select(Collection<String> columns) {
return new BatchScanAdapter(scan.select(columns));
Expand Down
36 changes: 36 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -165,6 +166,19 @@ default Long fileSequenceNumber() {
*/
F copyWithoutStats();

/**
* Copies this file with only specific column stats. Manifest readers can reuse file instances;
* use this method to copy data and only copy specific stats when collecting files.
*
* @param statsToKeep the collection of the column ids for the columns which stats are kept
* @return a copy of this data file, with stats lower bounds, upper bounds, value counts, null
* value counts, and nan value counts for only specific columns.
*/
default F copyWithSpecificStats(Collection<Integer> statsToKeep) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement copyWithSpecificStats");
}

/**
* Copies this file (potentially without file stats). Manifest readers can reuse file instances;
* use this method to copy data when collecting files from tasks.
Expand All @@ -177,4 +191,26 @@ default Long fileSequenceNumber() {
default F copy(boolean withStats) {
return withStats ? copy() : copyWithoutStats();
}

/**
* Copies this file (potentially with or without specific column stats). Manifest readers can
* reuse file instances; use this method to copy data when collecting files from tasks.
*
* @param withStats Will copy this file without file stats if set to <code>false</code>.
* @param statsToKeep Will keep stats only for these columns. Not used if <code>withStats</code>
* is set to <code>false</code>.
* @return a copy of this data file. If "withStats" is set to <code>false</code> the file will not
* contain lower bounds, upper bounds, value counts, null value counts, or nan value counts.
* If "withStats" is set to <code>true</code> and the "statsToKeep" is not empty then only
* specific column stats will be kept.
*/
default F copy(boolean withStats, Collection<Integer> statsToKeep) {
if (withStats) {
return statsToKeep != null && !statsToKeep.isEmpty()
? copyWithSpecificStats(statsToKeep)
: copy();
} else {
return copyWithoutStats();
}
}
}
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/iceberg/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ public interface Scan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>> {
*/
ThisT includeColumnStats();

/**
* Create a new scan from this that loads the column stats for the specific columns with each data
* file.
*
* <p>Column stats include: value count, null value count, lower bounds, and upper bounds.
*
* @param columns column ids from the table's schema
* @return a new scan based on this that loads column stats for specific columns.
*/
default ThisT includeColumnStats(Collection<Integer> columns) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement includeColumnStats");
}

/**
* Create a new scan from this that will read the given data columns. This produces an expected
* schema that includes all fields that are either selected or used by this scan's filter
Expand Down
6 changes: 6 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -662,6 +663,11 @@ public DataFile copyWithoutStats() {
return this;
}

@Override
public DataFile copyWithSpecificStats(Collection<Integer> statsToKeep) {
return this;
}

@Override
public List<Long> splitOffsets() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,9 @@ private CloseableIterable<ScanTask> toFileTasks(
ScanMetricsUtil.fileTask(scanMetrics(), dataFile, deleteFiles);

return new BaseFileScanTask(
copyDataFiles ? dataFile.copy(shouldReturnColumnStats()) : dataFile,
copyDataFiles
? dataFile.copy(shouldReturnColumnStats(), columnStatsToInclude())
: dataFile,
deleteFiles,
schemaString,
specString,
Expand Down
62 changes: 55 additions & 7 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
Expand All @@ -31,6 +32,7 @@
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
Expand Down Expand Up @@ -174,8 +176,9 @@ public PartitionData copy() {
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param statsToKeep the collection of the column ids for the columns which stats are kept
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
BaseFile(BaseFile<F> toCopy, boolean fullCopy, Collection<Integer> statsToKeep) {
this.fileOrdinal = toCopy.fileOrdinal;
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
Expand All @@ -186,12 +189,23 @@ public PartitionData copy() {
this.recordCount = toCopy.recordCount;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
if (fullCopy) {
this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes);
this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts);
this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts);
this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts);
this.lowerBounds = SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds));
this.upperBounds = SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds));
if (statsToKeep == null || statsToKeep.isEmpty()) {
this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes);
this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts);
this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts);
this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts);
this.lowerBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds));
this.upperBounds =
SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds));
} else {
this.columnSizes = filteredLongMap(toCopy.columnSizes, statsToKeep);
this.valueCounts = filteredLongMap(toCopy.valueCounts, statsToKeep);
this.nullValueCounts = filteredLongMap(toCopy.nullValueCounts, statsToKeep);
this.nanValueCounts = filteredLongMap(toCopy.nanValueCounts, statsToKeep);
this.lowerBounds = filteredByteBufferMap(toCopy.lowerBounds, statsToKeep);
this.upperBounds = filteredByteBufferMap(toCopy.upperBounds, statsToKeep);
}
} else {
this.columnSizes = null;
this.valueCounts = null;
Expand Down Expand Up @@ -504,6 +518,40 @@ private static Map<Integer, ByteBuffer> toReadableByteBufferMap(Map<Integer, Byt
}
}

private static Map<Integer, Long> filteredLongMap(
Map<Integer, Long> map, Collection<Integer> columnIds) {
if (map == null) {
return null;
}

Map<Integer, Long> filtered = Maps.newHashMapWithExpectedSize(columnIds.size());
for (Integer columnId : columnIds) {
Long value = map.get(columnId);
if (value != null) {
filtered.put(columnId, value);
}
}

return SerializableMap.copyOf(filtered);
}

private static Map<Integer, ByteBuffer> filteredByteBufferMap(
Map<Integer, ByteBuffer> map, Collection<Integer> columnIds) {
if (map == null) {
return null;
}

Map<Integer, ByteBuffer> filtered = Maps.newHashMapWithExpectedSize(columnIds.size());
for (Integer columnId : columnIds) {
ByteBuffer value = map.get(columnId);
if (value != null) {
filtered.put(columnId, value);
}
}

return SerializableMap.copyOf(filtered);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private CloseableIterable<FileScanTask> appendFilesFromSnapshots(List<Snapshot>
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
.specsById(table().specs())
.ignoreDeleted();
.ignoreDeleted()
.columnStatsToKeep(context().returnColumnStatsToInclude());

if (context().ignoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
.select(scanColumns())
.filterData(filter())
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
.ignoreExisting();
.ignoreExisting()
.columnStatsToKeep(context().returnColumnStatsToInclude());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ protected boolean shouldReturnColumnStats() {
return context().returnColumnStats();
}

protected Collection<Integer> columnStatsToInclude() {
return context().returnColumnStatsToInclude();
}

protected boolean shouldIgnoreResiduals() {
return context().ignoreResiduals();
}
Expand Down Expand Up @@ -165,6 +169,14 @@ public ThisT includeColumnStats() {
return newRefinedScan(table, schema, context.shouldReturnColumnStats(true));
}

@Override
public ThisT includeColumnStats(Collection<Integer> statsNeeded) {
return newRefinedScan(
table,
schema,
context.shouldReturnColumnStats(true).shouldReturnSpecificColumnStats(statsNeeded));
}

@Override
public ThisT select(Collection<String> columns) {
return newRefinedScan(table, schema, context.selectColumns(columns));
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ protected ManifestGroup newManifestGroup(
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted();
.ignoreDeleted()
.columnStatsToKeep(context().returnColumnStatsToInclude());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted();
.ignoreDeleted()
.columnStatsToKeep(context().returnColumnStatsToInclude());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Types;

class GenericDataFile extends BaseFile<DataFile> implements DataFile {
Expand Down Expand Up @@ -67,22 +69,29 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param statsToKeep column ids columns where we need to keep the stats
*/
private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
super(toCopy, fullCopy);
private GenericDataFile(
GenericDataFile toCopy, boolean fullCopy, Collection<Integer> statsToKeep) {
super(toCopy, fullCopy, statsToKeep);
}

/** Constructor for Java serialization. */
GenericDataFile() {}

@Override
public DataFile copyWithoutStats() {
return new GenericDataFile(this, false /* drop stats */);
return new GenericDataFile(this, false /* drop stats */, ImmutableSet.of());
}

@Override
public DataFile copyWithSpecificStats(Collection<Integer> statsToKeep) {
return new GenericDataFile(this, true, statsToKeep);
}

@Override
public DataFile copy() {
return new GenericDataFile(this, true /* full copy */);
return new GenericDataFile(this, true /* full copy */, ImmutableSet.of());
}

@Override
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Types;

class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
Expand Down Expand Up @@ -68,22 +70,29 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param statsToKeep column ids columns where we need to keep the stats
*/
private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) {
super(toCopy, fullCopy);
private GenericDeleteFile(
GenericDeleteFile toCopy, boolean fullCopy, Collection<Integer> statsToKeep) {
super(toCopy, fullCopy, statsToKeep);
}

/** Constructor for Java serialization. */
GenericDeleteFile() {}

@Override
public DeleteFile copyWithoutStats() {
return new GenericDeleteFile(this, false /* drop stats */);
return new GenericDeleteFile(this, false /* drop stats */, ImmutableSet.of());
}

@Override
public DeleteFile copyWithSpecificStats(Collection<Integer> statsToKeep) {
return new GenericDeleteFile(this, true, statsToKeep);
}

@Override
public DeleteFile copy() {
return new GenericDeleteFile(this, true /* full copy */);
return new GenericDeleteFile(this, true /* full copy */, ImmutableSet.of());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public CloseableIterable<FileScanTask> planFiles() {
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
.specsById(table().specs())
.ignoreDeleted();
.ignoreDeleted()
.columnStatsToKeep(context().returnColumnStatsToInclude());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
Loading

0 comments on commit 5bb425d

Please sign in to comment.