Skip to content

Commit

Permalink
Merge branch 'master' into feature/catalog-and-table-repr
Browse files Browse the repository at this point in the history
  • Loading branch information
jayceslesar committed Sep 22, 2023
2 parents 6d7acfc + 4cc609a commit e6cc3ff
Show file tree
Hide file tree
Showing 120 changed files with 3,085 additions and 943 deletions.
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,15 @@ default DeleteFiles deleteFile(DataFile file) {
* @return this for method chaining
*/
DeleteFiles caseSensitive(boolean caseSensitive);

/**
* Enables validation that any files that are part of the deletion still exist when committing the
* operation.
*
* @return this for method chaining
*/
default DeleteFiles validateFilesExist() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement validateFilesExist");
}
}
29 changes: 20 additions & 9 deletions core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.PlanningMode.AUTO;
import static org.apache.iceberg.TableProperties.DATA_PLANNING_MODE;
import static org.apache.iceberg.TableProperties.DELETE_PLANNING_MODE;
import static org.apache.iceberg.TableProperties.PLANNING_MODE_DEFAULT;
Expand Down Expand Up @@ -144,20 +145,19 @@ protected PlanningMode deletePlanningMode() {
protected CloseableIterable<ScanTask> doPlanFiles() {
Snapshot snapshot = snapshot();

List<ManifestFile> dataManifests = findMatchingDataManifests(snapshot);
boolean planDataLocally = shouldPlanLocally(dataPlanningMode(), dataManifests);

List<ManifestFile> deleteManifests = findMatchingDeleteManifests(snapshot);
boolean planDeletesLocally = shouldPlanLocally(deletePlanningMode(), deleteManifests);
boolean mayHaveEqualityDeletes = deleteManifests.size() > 0 && mayHaveEqualityDeletes(snapshot);
boolean planDeletesLocally = shouldPlanDeletesLocally(deleteManifests, mayHaveEqualityDeletes);

List<ManifestFile> dataManifests = findMatchingDataManifests(snapshot);
boolean loadColumnStats = mayHaveEqualityDeletes || shouldReturnColumnStats();
boolean planDataLocally = shouldPlanDataLocally(dataManifests, loadColumnStats);
boolean copyDataFiles = shouldCopyDataFiles(planDataLocally, loadColumnStats);

if (planDataLocally && planDeletesLocally) {
return planFileTasksLocally(dataManifests, deleteManifests);
}

boolean mayHaveEqualityDeletes = deleteManifests.size() > 0 && mayHaveEqualityDeletes(snapshot);
boolean loadColumnStats = mayHaveEqualityDeletes || shouldReturnColumnStats();
boolean copyDataFiles = shouldCopyDataFiles(planDataLocally, loadColumnStats);

ExecutorService monitorPool = newMonitorPool();

CompletableFuture<DeleteFileIndex> deletesFuture =
Expand Down Expand Up @@ -223,7 +223,18 @@ private List<ManifestFile> filterManifests(List<ManifestFile> manifests) {
.collect(Collectors.toList());
}

protected boolean shouldPlanLocally(PlanningMode mode, List<ManifestFile> manifests) {
private boolean shouldPlanDeletesLocally(
List<ManifestFile> deleteManifests, boolean mayHaveEqualityDeletes) {
PlanningMode mode = deletePlanningMode();
return (mode == AUTO && mayHaveEqualityDeletes) || shouldPlanLocally(mode, deleteManifests);
}

private boolean shouldPlanDataLocally(List<ManifestFile> dataManifests, boolean loadColumnStats) {
PlanningMode mode = dataPlanningMode();
return (mode == AUTO && loadColumnStats) || shouldPlanLocally(mode, dataManifests);
}

private boolean shouldPlanLocally(PlanningMode mode, List<ManifestFile> manifests) {
if (context().planWithCustomizedExecutor()) {
return true;
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public BaseTable(TableOperations ops, String name, MetricsReporter reporter) {
this.reporter = reporter;
}

MetricsReporter reporter() {
return reporter;
}

@Override
public TableOperations operations() {
return ops;
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public String uuid() {
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.assignUUID(uuid);
}

@Override
public void applyTo(ViewMetadata.Builder metadataBuilder) {
metadataBuilder.assignUUID(uuid);
}
}

class UpgradeFormatVersion implements MetadataUpdate {
Expand Down
30 changes: 24 additions & 6 deletions core/src/main/java/org/apache/iceberg/MetricsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,36 @@ public class MetricsUtil {
private MetricsUtil() {}

/**
* Copies a metrics object without lower and upper bounds for given fields.
* Copies a metrics object without value, NULL and NaN counts for given fields.
*
* @param excludedFieldIds field IDs for which the lower and upper bounds must be dropped
* @param excludedFieldIds field IDs for which the counts must be dropped
* @return a new metrics object without counts for given fields
*/
public static Metrics copyWithoutFieldCounts(Metrics metrics, Set<Integer> excludedFieldIds) {
return new Metrics(
metrics.recordCount(),
metrics.columnSizes(),
copyWithoutKeys(metrics.valueCounts(), excludedFieldIds),
copyWithoutKeys(metrics.nullValueCounts(), excludedFieldIds),
copyWithoutKeys(metrics.nanValueCounts(), excludedFieldIds),
metrics.lowerBounds(),
metrics.upperBounds());
}

/**
* Copies a metrics object without counts and bounds for given fields.
*
* @param excludedFieldIds field IDs for which the counts and bounds must be dropped
* @return a new metrics object without lower and upper bounds for given fields
*/
public static Metrics copyWithoutFieldBounds(Metrics metrics, Set<Integer> excludedFieldIds) {
public static Metrics copyWithoutFieldCountsAndBounds(
Metrics metrics, Set<Integer> excludedFieldIds) {
return new Metrics(
metrics.recordCount(),
metrics.columnSizes(),
metrics.valueCounts(),
metrics.nullValueCounts(),
metrics.nanValueCounts(),
copyWithoutKeys(metrics.valueCounts(), excludedFieldIds),
copyWithoutKeys(metrics.nullValueCounts(), excludedFieldIds),
copyWithoutKeys(metrics.nanValueCounts(), excludedFieldIds),
copyWithoutKeys(metrics.lowerBounds(), excludedFieldIds),
copyWithoutKeys(metrics.upperBounds(), excludedFieldIds));
}
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/StreamingDelete.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
* CommitFailedException}.
*/
public class StreamingDelete extends MergingSnapshotProducer<DeleteFiles> implements DeleteFiles {
private boolean validateFilesToDeleteExist = false;

protected StreamingDelete(String tableName, TableOperations ops) {
super(tableName, ops);
}
Expand Down Expand Up @@ -60,9 +62,22 @@ public StreamingDelete deleteFromRowFilter(Expression expr) {
return this;
}

@Override
public DeleteFiles validateFilesExist() {
this.validateFilesToDeleteExist = true;
return this;
}

@Override
public StreamingDelete toBranch(String branch) {
targetBranch(branch);
return this;
}

@Override
protected void validate(TableMetadata base, Snapshot parent) {
if (validateFilesToDeleteExist) {
failMissingDeletePaths();
}
}
}
21 changes: 18 additions & 3 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static TableMetadata newTableMetadata(
PropertyUtil.propertyAsInt(
properties, TableProperties.FORMAT_VERSION, DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(
schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion);
schema, spec, sortOrder, location, persistedProperties(properties), formatVersion);
}

public static TableMetadata newTableMetadata(
Expand All @@ -78,7 +78,7 @@ public static TableMetadata newTableMetadata(
PropertyUtil.propertyAsInt(
properties, TableProperties.FORMAT_VERSION, DEFAULT_TABLE_FORMAT_VERSION);
return newTableMetadata(
schema, spec, sortOrder, location, unreservedProperties(properties), formatVersion);
schema, spec, sortOrder, location, persistedProperties(properties), formatVersion);
}

private static Map<String, String> unreservedProperties(Map<String, String> rawProperties) {
Expand All @@ -87,6 +87,21 @@ private static Map<String, String> unreservedProperties(Map<String, String> rawP
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private static Map<String, String> persistedProperties(Map<String, String> rawProperties) {
Map<String, String> persistedProperties = Maps.newHashMap();

// explicitly set defaults that apply only to new tables
persistedProperties.put(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);

rawProperties.entrySet().stream()
.filter(entry -> !TableProperties.RESERVED_PROPERTIES.contains(entry.getKey()))
.forEach(entry -> persistedProperties.put(entry.getKey(), entry.getValue()));

return persistedProperties;
}

static TableMetadata newTableMetadata(
Schema schema,
PartitionSpec spec,
Expand Down Expand Up @@ -685,7 +700,7 @@ public TableMetadata buildReplacement(
.setDefaultPartitionSpec(freshSpec)
.setDefaultSortOrder(freshSortOrder)
.setLocation(newLocation)
.setProperties(unreservedProperties(updatedProperties))
.setProperties(persistedProperties(updatedProperties))
.build();
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private TableProperties() {}
public static final String PARQUET_COMPRESSION = "write.parquet.compression-codec";
public static final String DELETE_PARQUET_COMPRESSION = "write.delete.parquet.compression-codec";
public static final String PARQUET_COMPRESSION_DEFAULT = "gzip";
public static final String PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0 = "zstd";

public static final String PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level";
public static final String DELETE_PARQUET_COMPRESSION_LEVEL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
*/
public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
private static final Set<Integer> SINGLE_REFERENCED_FILE_BOUNDS_ONLY =
private static final Set<Integer> FILE_AND_POS_FIELD_IDS =
ImmutableSet.of(DELETE_FILE_PATH.fieldId(), DELETE_FILE_POS.fieldId());

private final FileAppender<StructLike> appender;
Expand Down Expand Up @@ -121,9 +121,9 @@ public DeleteWriteResult result() {
private Metrics metrics() {
Metrics metrics = appender.metrics();
if (referencedDataFiles.size() > 1) {
return MetricsUtil.copyWithoutFieldBounds(metrics, SINGLE_REFERENCED_FILE_BOUNDS_ONLY);
return MetricsUtil.copyWithoutFieldCountsAndBounds(metrics, FILE_AND_POS_FIELD_IDS);
} else {
return metrics;
return MetricsUtil.copyWithoutFieldCounts(metrics, FILE_AND_POS_FIELD_IDS);
}
}
}
38 changes: 30 additions & 8 deletions core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
Expand All @@ -35,7 +36,7 @@ public class OutputFileFactory {
private final PartitionSpec defaultSpec;
private final FileFormat format;
private final LocationProvider locations;
private final FileIO io;
private final Supplier<FileIO> ioSupplier;
private final EncryptionManager encryptionManager;
private final int partitionId;
private final long taskId;
Expand All @@ -56,7 +57,7 @@ public class OutputFileFactory {
* @param spec Partition specification used by the location provider
* @param format File format used for the extension
* @param locations Location provider used for generating locations
* @param io FileIO to store the files
* @param ioSupplier Supplier of FileIO to store the files
* @param encryptionManager Encryption manager used for encrypting the files
* @param partitionId First part of the file name
* @param taskId Second part of the file name
Expand All @@ -67,7 +68,7 @@ private OutputFileFactory(
PartitionSpec spec,
FileFormat format,
LocationProvider locations,
FileIO io,
Supplier<FileIO> ioSupplier,
EncryptionManager encryptionManager,
int partitionId,
long taskId,
Expand All @@ -76,7 +77,7 @@ private OutputFileFactory(
this.defaultSpec = spec;
this.format = format;
this.locations = locations;
this.io = io;
this.ioSupplier = ioSupplier;
this.encryptionManager = encryptionManager;
this.partitionId = partitionId;
this.taskId = taskId;
Expand All @@ -101,7 +102,7 @@ private String generateFilename() {

/** Generates an {@link EncryptedOutputFile} for unpartitioned writes. */
public EncryptedOutputFile newOutputFile() {
OutputFile file = io.newOutputFile(locations.newDataLocation(generateFilename()));
OutputFile file = ioSupplier.get().newOutputFile(locations.newDataLocation(generateFilename()));
return encryptionManager.encrypt(file);
}

Expand All @@ -113,7 +114,7 @@ public EncryptedOutputFile newOutputFile(StructLike partition) {
/** Generates an {@link EncryptedOutputFile} for partitioned writes in a given spec. */
public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) {
String newDataLocation = locations.newDataLocation(spec, partition, generateFilename());
OutputFile rawOutputFile = io.newOutputFile(newDataLocation);
OutputFile rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation);
return encryptionManager.encrypt(rawOutputFile);
}

Expand All @@ -125,6 +126,7 @@ public static class Builder {
private String operationId;
private FileFormat format;
private String suffix;
private Supplier<FileIO> ioSupplier;

private Builder(Table table, int partitionId, long taskId) {
this.table = table;
Expand All @@ -136,6 +138,7 @@ private Builder(Table table, int partitionId, long taskId) {
String formatAsString =
table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
this.format = FileFormat.fromString(formatAsString);
this.ioSupplier = table::io;
}

public Builder defaultSpec(PartitionSpec newDefaultSpec) {
Expand All @@ -158,12 +161,31 @@ public Builder suffix(String newSuffix) {
return this;
}

/**
* Configures a {@link FileIO} supplier, which can potentially be used to dynamically refresh
* the file IO instance when a table is refreshed.
*
* @param newIoSupplier The file IO supplier
* @return this builder instance
*/
public Builder ioSupplier(Supplier<FileIO> newIoSupplier) {
this.ioSupplier = newIoSupplier;
return this;
}

public OutputFileFactory build() {
LocationProvider locations = table.locationProvider();
FileIO io = table.io();
EncryptionManager encryption = table.encryption();
return new OutputFileFactory(
defaultSpec, format, locations, io, encryption, partitionId, taskId, operationId, suffix);
defaultSpec,
format,
locations,
ioSupplier,
encryption,
partitionId,
taskId,
operationId,
suffix);
}
}
}
Loading

0 comments on commit e6cc3ff

Please sign in to comment.