Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support persisting TableMetadata in the metastore #433

Open
wants to merge 59 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
6c90a3d
wip
eric-maynard Nov 6, 2024
bda008e
first real commit
eric-maynard Nov 7, 2024
c38a01d
cleaning up
eric-maynard Nov 7, 2024
3a69953
tests fixed
eric-maynard Nov 7, 2024
ca856ed
begin refactoring
eric-maynard Nov 7, 2024
779d912
wire up to loadTable
eric-maynard Nov 7, 2024
b4c5335
autolint
eric-maynard Nov 7, 2024
6e23be4
logic to purge old records
eric-maynard Nov 7, 2024
5805679
autolint
eric-maynard Nov 7, 2024
f32a82c
stable, disabled
eric-maynard Nov 7, 2024
249ecac
passing when enabled
eric-maynard Nov 7, 2024
0db85eb
stable
eric-maynard Nov 7, 2024
6f0d9df
autolint
eric-maynard Nov 7, 2024
c0ea191
resolve conflicts; pull main
eric-maynard Nov 7, 2024
f250916
autolint
eric-maynard Nov 7, 2024
dd87d42
oops
eric-maynard Nov 7, 2024
c46eb03
add try/catch
eric-maynard Nov 7, 2024
1e1b953
autolint
eric-maynard Nov 7, 2024
a164807
added drop-on-drop
eric-maynard Nov 8, 2024
6ed3f97
autolint
eric-maynard Nov 8, 2024
35b852f
refactor to remove new entity
eric-maynard Nov 12, 2024
0b3df37
autolint
eric-maynard Nov 12, 2024
5f7c74f
fixes
eric-maynard Nov 12, 2024
d99a796
autolint
eric-maynard Nov 12, 2024
f1023ec
stable
eric-maynard Nov 12, 2024
045d93d
autolint
eric-maynard Nov 12, 2024
20edca5
polish
eric-maynard Nov 12, 2024
f189e98
one fix
eric-maynard Nov 12, 2024
5e90ba2
autolint
eric-maynard Nov 12, 2024
fbdce6a
?
eric-maynard Nov 12, 2024
f1461e5
more widespread use of loadTableMetadata
eric-maynard Nov 14, 2024
c5081d5
autolint
eric-maynard Nov 14, 2024
452c5a1
persist on write
eric-maynard Nov 14, 2024
b85ad92
autolint
eric-maynard Nov 14, 2024
fde3910
debugging
eric-maynard Nov 14, 2024
34b9b0f
fix failing test, add TODO
eric-maynard Nov 14, 2024
6bc07c8
debugging
eric-maynard Nov 14, 2024
a9dcaa1
fix current metadata check
eric-maynard Nov 14, 2024
9b66016
autolint
eric-maynard Nov 14, 2024
52b82f7
fix
eric-maynard Nov 14, 2024
ee3f401
one change per review
eric-maynard Nov 15, 2024
69c2ca5
fixes per review
eric-maynard Nov 17, 2024
7b3e68d
autolint
eric-maynard Nov 17, 2024
0b5291e
add a comment per review; make check more clear
eric-maynard Nov 19, 2024
5488d76
autolint
eric-maynard Nov 19, 2024
64e8d2b
fix method name
eric-maynard Nov 19, 2024
5eca203
autolint
eric-maynard Nov 19, 2024
7f89a24
remove check
eric-maynard Nov 21, 2024
9829486
autolint
eric-maynard Nov 21, 2024
73f5dd2
defer serialization
eric-maynard Nov 25, 2024
59361ed
bounded serde
eric-maynard Nov 25, 2024
c67492f
autolint
eric-maynard Nov 25, 2024
85ec25b
no more exceptions
eric-maynard Nov 25, 2024
424b7d9
comment
eric-maynard Nov 25, 2024
94b5864
Merge branch 'main' of github.com:apache/polaris into metadata-cache
eric-maynard Nov 25, 2024
a5fe3c1
fix
eric-maynard Nov 25, 2024
0b152f7
autolint
eric-maynard Nov 25, 2024
3584842
stable
eric-maynard Nov 25, 2024
8d704eb
fix
eric-maynard Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.polaris.core.admin.model.StorageConfigInfo;

public class PolarisConfiguration<T> {
Expand All @@ -30,15 +31,23 @@ public class PolarisConfiguration<T> {
public final T defaultValue;
private final Optional<String> catalogConfigImpl;
private final Class<T> typ;
private final Optional<Function<T, Boolean>> validation;

@SuppressWarnings("unchecked")
public PolarisConfiguration(
String key, String description, T defaultValue, Optional<String> catalogConfig) {
String key,
String description,
T defaultValue,
Optional<String> catalogConfig,
Optional<Function<T, Boolean>> validation) {
this.key = key;
this.description = description;
this.defaultValue = defaultValue;
this.catalogConfigImpl = catalogConfig;
this.typ = (Class<T>) defaultValue.getClass();
this.validation = validation;

validate(cast(defaultValue));
Copy link
Member

@RussellSpitzer RussellSpitzer Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just a safety check? Feels like this shouldn't be able to break right?

Copy link
Member

@RussellSpitzer RussellSpitzer Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also isn't this redundant now. (cast calls validate)

So wouldn't this be

validate(
   cast(defaultValue){
      validate(defaultValue)
   }
}

}

public boolean hasCatalogConfig() {
Expand All @@ -53,14 +62,27 @@ public String catalogConfig() {
}

T cast(Object value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda want to call this apply() now instead of cast ... :)

Just because it does dual duty now

return this.typ.cast(value);
T result = this.typ.cast(value);
validate(result);
return result;
}

private void validate(T value) {
this.validation.ifPresent(
v -> {
if (!v.apply(value)) {
throw new IllegalArgumentException(
String.format("Configuration %s has invalid value %s", key, defaultValue));
}
});
}

public static class Builder<T> {
private String key;
private String description;
private T defaultValue;
private Optional<String> catalogConfig = Optional.empty();
private Optional<Function<T, Boolean>> validation = Optional.empty();

public Builder<T> key(String key) {
this.key = key;
Expand Down Expand Up @@ -88,11 +110,16 @@ public Builder<T> catalogConfig(String catalogConfig) {
return this;
}

public Builder<T> validation(Function<T, Boolean> validation) {
this.validation = Optional.of(validation);
return this;
}

public PolarisConfiguration<T> build() {
if (key == null || description == null || defaultValue == null) {
throw new IllegalArgumentException("key, description, and defaultValue are required");
}
return new PolarisConfiguration<>(key, description, defaultValue, catalogConfig);
return new PolarisConfiguration<>(key, description, defaultValue, catalogConfig, validation);
}
}

Expand Down Expand Up @@ -206,4 +233,18 @@ public static <T> Builder<T> builder() {
"If set to true, allows tables to be dropped with the purge parameter set to true.")
.defaultValue(true)
.build();

public static final int METADATA_CACHE_MAX_BYTES_NO_CACHING = 0;
public static final int METADATA_CACHE_MAX_BYTES_INFINITE_CACHING = -1;
public static final PolarisConfiguration<Integer> METADATA_CACHE_MAX_BYTES =
PolarisConfiguration.<Integer>builder()
.key("METADATA_CACHE_MAX_BYTES")
.catalogConfig("metadata.cache.max.bytes")
.description(
"If nonzero, the approximate max size a table's metadata can be in order to be cached in the persistence"
+ " layer. If zero, no metadata will be cached or served from the cache. If -1, all metadata"
+ " will be cached.")
.defaultValue(METADATA_CACHE_MAX_BYTES_NO_CACHING)
.validation(value -> value >= -1)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ public class TableLikeEntity extends PolarisEntity {
// of the internalProperties JSON file.
public static final String METADATA_LOCATION_KEY = "metadata-location";

// For applicable types, this key on the "internalProperties" map will return the content of the
// metadata.json file located at `METADATA_CACHE_LOCATION_KEY`
private static final String METADATA_CACHE_CONTENT_KEY = "metadata-cache-content";

// For applicable types, this key on the "internalProperties" map will return the location of the
// `metadata.json` that is cached in `METADATA_CACHE_CONTENT_KEY`. This will often match the
// current metadata location in `METADATA_LOCATION_KEY`; if it does not the cache is invalid
private static final String METADATA_CACHE_LOCATION_KEY = "metadata-cache-location";
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved

public static final String USER_SPECIFIED_WRITE_DATA_LOCATION_KEY = "write.data.path";
public static final String USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY = "write.metadata.path";

Expand Down Expand Up @@ -67,6 +76,16 @@ public String getMetadataLocation() {
return getInternalPropertiesAsMap().get(METADATA_LOCATION_KEY);
}

@JsonIgnore
public String getMetadataCacheContent() {
return getInternalPropertiesAsMap().get(METADATA_CACHE_CONTENT_KEY);
}

@JsonIgnore
public String getMetadataCacheLocation() {
return getInternalPropertiesAsMap().get(METADATA_CACHE_LOCATION_KEY);
}

@JsonIgnore
public Optional<Long> getLastAdmittedNotificationTimestamp() {
return Optional.ofNullable(
Expand Down Expand Up @@ -121,6 +140,12 @@ public Builder setMetadataLocation(String location) {
return this;
}

public Builder setMetadataContent(String location, String content) {
internalProperties.put(METADATA_CACHE_LOCATION_KEY, location);
internalProperties.put(METADATA_CACHE_CONTENT_KEY, content);
return this;
}

public Builder setLastNotificationTimestamp(long timestamp) {
internalProperties.put(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY, String.valueOf(timestamp));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public EntityCacheTest() {
callCtx = new PolarisCallContext(metaStore, diagServices);
metaStoreManager = new PolarisMetaStoreManagerImpl();

// bootstrap the mata store with our test schema
// bootstrap the metastore with our test schema
tm = new PolarisTestMetaStoreManager(metaStoreManager, callCtx);
tm.testCreateTestCatalog();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
Expand All @@ -35,6 +37,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand All @@ -46,6 +49,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -64,7 +68,9 @@
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.BaseViewOperations;
import org.apache.iceberg.view.ViewBuilder;
Expand Down Expand Up @@ -103,6 +109,7 @@
import org.apache.polaris.core.storage.aws.PolarisS3FileIOClientFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.exception.IcebergExceptionMapper;
import org.apache.polaris.service.persistence.MetadataCacheManager;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
Expand Down Expand Up @@ -822,6 +829,34 @@ public Map<String, String> getCredentialConfig(
storageInfo.get());
}

public TableMetadata loadTableMetadata(TableIdentifier identifier) {
int maxMetadataCacheBytes =
callContext
.getPolarisCallContext()
.getConfigurationStore()
.getConfiguration(
callContext.getPolarisCallContext(), PolarisConfiguration.METADATA_CACHE_MAX_BYTES);
if (maxMetadataCacheBytes == PolarisConfiguration.METADATA_CACHE_MAX_BYTES_NO_CACHING) {
return loadTableMetadata(loadTable(identifier));
} else {
Supplier<TableMetadata> fallback = () -> loadTableMetadata(loadTable(identifier));
return MetadataCacheManager.loadTableMetadata(
identifier,
maxMetadataCacheBytes,
callContext.getPolarisCallContext(),
metaStoreManager,
resolvedEntityView,
fallback);
}
}

private static TableMetadata loadTableMetadata(Table table) {
if (table instanceof BaseTable baseTable) {
return baseTable.operations().current();
}
throw new IllegalArgumentException("Cannot load metadata for " + table.name());
}

/**
* Based on configuration settings, for callsites that need to handle potentially setting a new
* base location for a TableLike entity, produces the transformed location if applicable, or else
Expand Down Expand Up @@ -1344,24 +1379,48 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
TableLikeEntity entity =
TableLikeEntity.of(resolvedEntities == null ? null : resolvedEntities.getRawLeafEntity());
String existingLocation;
int maxMetadataCacheBytes =
callContext
.getPolarisCallContext()
.getConfigurationStore()
.getConfiguration(
callContext.getPolarisCallContext(),
catalogEntity,
PolarisConfiguration.METADATA_CACHE_MAX_BYTES);
Optional<String> metadataJsonOpt = Optional.empty();
boolean shouldPersistMetadata =
switch (maxMetadataCacheBytes) {
case PolarisConfiguration.METADATA_CACHE_MAX_BYTES_INFINITE_CACHING -> true;
case PolarisConfiguration.METADATA_CACHE_MAX_BYTES_NO_CACHING -> false;
default -> {
metadataJsonOpt = MetadataCacheManager.toBoundedJson(metadata, maxMetadataCacheBytes);
yield metadataJsonOpt
.map(String::length)
.map(l -> l <= maxMetadataCacheBytes)
.orElse(false);
}
};
final TableLikeEntity.Builder builder;
if (null == entity) {
existingLocation = null;
entity =
builder =
new TableLikeEntity.Builder(tableIdentifier, newLocation)
.setCatalogId(getCatalogId())
.setSubType(PolarisEntitySubType.TABLE)
.setBaseLocation(metadata.location())
.setId(
getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.build();
getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId());
} else {
existingLocation = entity.getMetadataLocation();
entity =
builder =
new TableLikeEntity.Builder(entity)
.setBaseLocation(metadata.location())
.setMetadataLocation(newLocation)
.build();
.setMetadataLocation(newLocation);
}
if (shouldPersistMetadata && metadataJsonOpt.isPresent()) {
builder.setMetadataContent(newLocation, metadataJsonOpt.get());
}
entity = builder.build();
if (!Objects.equal(existingLocation, oldLocation)) {
if (null == base) {
throw new AlreadyExistsException("Table already exists: %s", tableName());
Expand All @@ -1383,6 +1442,98 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
}
}

/**
* Copied from {@link BaseMetastoreTableOperations} but without the requirement that base ==
* current()
*
* @param base table metadata on which changes were based
* @param metadata new table metadata with updates
*/
@Override
public void commit(TableMetadata base, TableMetadata metadata) {
TableMetadata currentMetadata = current();

// if the metadata is already out of date, reject it
if (base == null) {
if (currentMetadata != null) {
// when current is non-null, the table exists. but when base is null, the commit is trying
// to create the table
throw new AlreadyExistsException("Table already exists: %s", tableName());
}
} else if (base.metadataFileLocation() != null
&& !base.metadataFileLocation().equals(currentMetadata.metadataFileLocation())) {
throw new CommitFailedException("Cannot commit: stale table metadata");
} else if (base != currentMetadata) {
// This branch is different from BaseMetastoreTableOperations
LOGGER.debug(
"Base object differs from current metadata; proceeding because locations match");
} else if (base.metadataFileLocation().equals(metadata.metadataFileLocation())) {
// if the metadata is not changed, return early
LOGGER.info("Nothing to commit.");
return;
}

long start = System.currentTimeMillis();
doCommit(base, metadata);
deleteRemovedMetadataFiles(base, metadata);
requestRefresh();

LOGGER.info(
"Successfully committed to table {} in {} ms",
tableName(),
System.currentTimeMillis() - start);
}

/**
* Copied from {@link BaseMetastoreTableOperations} as the method is private there This is moved
* to `CatalogUtils` in Iceberg 1.7.0 and can be called from there once we depend on Iceberg
* 1.7.0
*
* <p>Deletes the oldest metadata files if {@link
* TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true.
*
* @param base table metadata on which previous versions were based
* @param metadata new table metadata with updated previous versions
*/
protected void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) {
if (base == null) {
return;
}

boolean deleteAfterCommit =
metadata.propertyAsBoolean(
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT);

if (deleteAfterCommit) {
Set<TableMetadata.MetadataLogEntry> removedPreviousMetadataFiles =
Sets.newHashSet(base.previousFiles());
// TableMetadata#addPreviousFile builds up the metadata log and uses
// TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in
// the log, thus we don't include metadata.previousFiles() for deletion - everything else
// can
// be removed
removedPreviousMetadataFiles.removeAll(metadata.previousFiles());
if (io() instanceof SupportsBulkOperations) {
((SupportsBulkOperations) io())
.deleteFiles(
Iterables.transform(
removedPreviousMetadataFiles, TableMetadata.MetadataLogEntry::file));
} else {
Tasks.foreach(removedPreviousMetadataFiles)
.noRetry()
.suppressFailureWhenFinished()
.onFailure(
(previousMetadataFile, exc) ->
LOGGER.warn(
"Delete failed for previous metadata file: {}",
previousMetadataFile,
exc))
.run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file()));
}
}
}

@Override
public FileIO io() {
return tableFileIO;
Expand Down Expand Up @@ -1867,6 +2018,7 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {
}
}

// Drop the table:
return getMetaStoreManager()
.dropEntityIfExists(
getCurrentPolarisContext(),
Expand Down
Loading
Loading