diff --git a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java index 45a2f3c2a..1f3c531af 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.function.Function; import org.apache.polaris.core.admin.model.StorageConfigInfo; public class PolarisConfiguration { @@ -30,15 +31,24 @@ public class PolarisConfiguration { public final T defaultValue; private final Optional catalogConfigImpl; private final Class typ; + private final Optional> validation; @SuppressWarnings("unchecked") public PolarisConfiguration( - String key, String description, T defaultValue, Optional catalogConfig) { + String key, + String description, + T defaultValue, + Optional catalogConfig, + Optional> validation) { this.key = key; this.description = description; this.defaultValue = defaultValue; this.catalogConfigImpl = catalogConfig; this.typ = (Class) defaultValue.getClass(); + this.validation = validation; + + // Force validation: + cast(defaultValue); } public boolean hasCatalogConfig() { @@ -53,7 +63,19 @@ public String catalogConfig() { } T cast(Object value) { - return this.typ.cast(value); + T result = this.typ.cast(value); + validate(result); + return result; + } + + private void validate(T value) { + this.validation.ifPresent( + v -> { + if (!v.apply(value)) { + throw new IllegalArgumentException( + String.format("Configuration %s has invalid value %s", key, defaultValue)); + } + }); } public static class Builder { @@ -61,6 +83,7 @@ public static class Builder { private String description; private T defaultValue; private Optional catalogConfig = Optional.empty(); + private Optional> validation = Optional.empty(); public Builder key(String key) { this.key = key; @@ -88,11 +111,16 @@ public Builder catalogConfig(String catalogConfig) { return this; } + public Builder validation(Function validation) { + this.validation = Optional.of(validation); + return this; + } + public PolarisConfiguration build() { if (key == null || description == null || defaultValue == null) { throw new IllegalArgumentException("key, description, and defaultValue are required"); } - return new PolarisConfiguration<>(key, description, defaultValue, catalogConfig); + return new PolarisConfiguration<>(key, description, defaultValue, catalogConfig, validation); } } @@ -206,4 +234,18 @@ public static Builder builder() { "If set to true, allows tables to be dropped with the purge parameter set to true.") .defaultValue(true) .build(); + + public static final int METADATA_CACHE_MAX_BYTES_NO_CACHING = 0; + public static final int METADATA_CACHE_MAX_BYTES_INFINITE_CACHING = -1; + public static final PolarisConfiguration METADATA_CACHE_MAX_BYTES = + PolarisConfiguration.builder() + .key("METADATA_CACHE_MAX_BYTES") + .catalogConfig("metadata.cache.max.bytes") + .description( + "If nonzero, the approximate max size a table's metadata can be in order to be cached in the persistence" + + " layer. If zero, no metadata will be cached or served from the cache. If -1, all metadata" + + " will be cached.") + .defaultValue(METADATA_CACHE_MAX_BYTES_NO_CACHING) + .validation(value -> value >= -1) + .build(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java index 968598b93..c38b2d95e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/TableLikeEntity.java @@ -29,6 +29,15 @@ public class TableLikeEntity extends PolarisEntity { // of the internalProperties JSON file. public static final String METADATA_LOCATION_KEY = "metadata-location"; + // For applicable types, this key on the "internalProperties" map will return the content of the + // metadata.json file located at `METADATA_CACHE_LOCATION_KEY` + private static final String METADATA_CACHE_CONTENT_KEY = "metadata-cache-content"; + + // For applicable types, this key on the "internalProperties" map will return the location of the + // `metadata.json` that is cached in `METADATA_CACHE_CONTENT_KEY`. This will often match the + // current metadata location in `METADATA_LOCATION_KEY`; if it does not the cache is invalid + private static final String METADATA_CACHE_LOCATION_KEY = "metadata-cache-location"; + public static final String USER_SPECIFIED_WRITE_DATA_LOCATION_KEY = "write.data.path"; public static final String USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY = "write.metadata.path"; @@ -67,6 +76,16 @@ public String getMetadataLocation() { return getInternalPropertiesAsMap().get(METADATA_LOCATION_KEY); } + @JsonIgnore + public String getMetadataCacheContent() { + return getInternalPropertiesAsMap().get(METADATA_CACHE_CONTENT_KEY); + } + + @JsonIgnore + public String getMetadataCacheLocation() { + return getInternalPropertiesAsMap().get(METADATA_CACHE_LOCATION_KEY); + } + @JsonIgnore public Optional getLastAdmittedNotificationTimestamp() { return Optional.ofNullable( @@ -121,6 +140,12 @@ public Builder setMetadataLocation(String location) { return this; } + public Builder setMetadataContent(String location, String content) { + internalProperties.put(METADATA_CACHE_LOCATION_KEY, location); + internalProperties.put(METADATA_CACHE_CONTENT_KEY, content); + return this; + } + public Builder setLastNotificationTimestamp(long timestamp) { internalProperties.put(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY, String.valueOf(timestamp)); return this; diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/EntityCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/EntityCacheTest.java index c47367769..1c0851133 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/EntityCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/EntityCacheTest.java @@ -88,7 +88,7 @@ public EntityCacheTest() { callCtx = new PolarisCallContext(metaStore, diagServices); metaStoreManager = new PolarisMetaStoreManagerImpl(); - // bootstrap the mata store with our test schema + // bootstrap the metastore with our test schema tm = new PolarisTestMetaStoreManager(metaStoreManager, callCtx); tm.testCreateTestCatalog(); } diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java index 4911b5cfd..8331dbd5a 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java @@ -23,6 +23,8 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import java.io.Closeable; import java.io.IOException; import java.util.Arrays; @@ -35,6 +37,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -46,6 +49,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -64,7 +68,9 @@ import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; import org.apache.iceberg.view.BaseMetastoreViewCatalog; import org.apache.iceberg.view.BaseViewOperations; import org.apache.iceberg.view.ViewBuilder; @@ -103,6 +109,7 @@ import org.apache.polaris.core.storage.aws.PolarisS3FileIOClientFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.exception.IcebergExceptionMapper; +import org.apache.polaris.service.persistence.MetadataCacheManager; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; import org.apache.polaris.service.types.NotificationType; @@ -822,6 +829,34 @@ public Map getCredentialConfig( storageInfo.get()); } + public TableMetadata loadTableMetadata(TableIdentifier identifier) { + int maxMetadataCacheBytes = + callContext + .getPolarisCallContext() + .getConfigurationStore() + .getConfiguration( + callContext.getPolarisCallContext(), PolarisConfiguration.METADATA_CACHE_MAX_BYTES); + if (maxMetadataCacheBytes == PolarisConfiguration.METADATA_CACHE_MAX_BYTES_NO_CACHING) { + return loadTableMetadata(loadTable(identifier)); + } else { + Supplier fallback = () -> loadTableMetadata(loadTable(identifier)); + return MetadataCacheManager.loadTableMetadata( + identifier, + maxMetadataCacheBytes, + callContext.getPolarisCallContext(), + metaStoreManager, + resolvedEntityView, + fallback); + } + } + + private static TableMetadata loadTableMetadata(Table table) { + if (table instanceof BaseTable baseTable) { + return baseTable.operations().current(); + } + throw new IllegalArgumentException("Cannot load metadata for " + table.name()); + } + /** * Based on configuration settings, for callsites that need to handle potentially setting a new * base location for a TableLike entity, produces the transformed location if applicable, or else @@ -1344,24 +1379,51 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { TableLikeEntity entity = TableLikeEntity.of(resolvedEntities == null ? null : resolvedEntities.getRawLeafEntity()); String existingLocation; + int maxMetadataCacheBytes = + callContext + .getPolarisCallContext() + .getConfigurationStore() + .getConfiguration( + callContext.getPolarisCallContext(), + catalogEntity, + PolarisConfiguration.METADATA_CACHE_MAX_BYTES); + Optional metadataJsonOpt = Optional.empty(); + boolean shouldPersistMetadata = + switch (maxMetadataCacheBytes) { + case PolarisConfiguration.METADATA_CACHE_MAX_BYTES_NO_CACHING -> false; + case PolarisConfiguration.METADATA_CACHE_MAX_BYTES_INFINITE_CACHING -> { + metadataJsonOpt = MetadataCacheManager.toBoundedJson(metadata, maxMetadataCacheBytes); + yield true; + } + default -> { + metadataJsonOpt = MetadataCacheManager.toBoundedJson(metadata, maxMetadataCacheBytes); + yield metadataJsonOpt + .map(String::length) + .map(l -> l <= maxMetadataCacheBytes) + .orElse(false); + } + }; + final TableLikeEntity.Builder builder; if (null == entity) { existingLocation = null; - entity = + builder = new TableLikeEntity.Builder(tableIdentifier, newLocation) .setCatalogId(getCatalogId()) .setSubType(PolarisEntitySubType.TABLE) .setBaseLocation(metadata.location()) .setId( - getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId()) - .build(); + getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId()); } else { existingLocation = entity.getMetadataLocation(); - entity = + builder = new TableLikeEntity.Builder(entity) .setBaseLocation(metadata.location()) - .setMetadataLocation(newLocation) - .build(); + .setMetadataLocation(newLocation); + } + if (shouldPersistMetadata && metadataJsonOpt.isPresent()) { + builder.setMetadataContent(newLocation, metadataJsonOpt.get()); } + entity = builder.build(); if (!Objects.equal(existingLocation, oldLocation)) { if (null == base) { throw new AlreadyExistsException("Table already exists: %s", tableName()); @@ -1383,6 +1445,98 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { } } + /** + * Copied from {@link BaseMetastoreTableOperations} but without the requirement that base == + * current() + * + * @param base table metadata on which changes were based + * @param metadata new table metadata with updates + */ + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + TableMetadata currentMetadata = current(); + + // if the metadata is already out of date, reject it + if (base == null) { + if (currentMetadata != null) { + // when current is non-null, the table exists. but when base is null, the commit is trying + // to create the table + throw new AlreadyExistsException("Table already exists: %s", tableName()); + } + } else if (base.metadataFileLocation() != null + && !base.metadataFileLocation().equals(currentMetadata.metadataFileLocation())) { + throw new CommitFailedException("Cannot commit: stale table metadata"); + } else if (base != currentMetadata) { + // This branch is different from BaseMetastoreTableOperations + LOGGER.debug( + "Base object differs from current metadata; proceeding because locations match"); + } else if (base.metadataFileLocation().equals(metadata.metadataFileLocation())) { + // if the metadata is not changed, return early + LOGGER.info("Nothing to commit."); + return; + } + + long start = System.currentTimeMillis(); + doCommit(base, metadata); + deleteRemovedMetadataFiles(base, metadata); + requestRefresh(); + + LOGGER.info( + "Successfully committed to table {} in {} ms", + tableName(), + System.currentTimeMillis() - start); + } + + /** + * Copied from {@link BaseMetastoreTableOperations} as the method is private there This is moved + * to `CatalogUtils` in Iceberg 1.7.0 and can be called from there once we depend on Iceberg + * 1.7.0 + * + *

Deletes the oldest metadata files if {@link + * TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} is true. + * + * @param base table metadata on which previous versions were based + * @param metadata new table metadata with updated previous versions + */ + protected void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) { + if (base == null) { + return; + } + + boolean deleteAfterCommit = + metadata.propertyAsBoolean( + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT); + + if (deleteAfterCommit) { + Set removedPreviousMetadataFiles = + Sets.newHashSet(base.previousFiles()); + // TableMetadata#addPreviousFile builds up the metadata log and uses + // TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in + // the log, thus we don't include metadata.previousFiles() for deletion - everything else + // can + // be removed + removedPreviousMetadataFiles.removeAll(metadata.previousFiles()); + if (io() instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) io()) + .deleteFiles( + Iterables.transform( + removedPreviousMetadataFiles, TableMetadata.MetadataLogEntry::file)); + } else { + Tasks.foreach(removedPreviousMetadataFiles) + .noRetry() + .suppressFailureWhenFinished() + .onFailure( + (previousMetadataFile, exc) -> + LOGGER.warn( + "Delete failed for previous metadata file: {}", + previousMetadataFile, + exc)) + .run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file())); + } + } + } + @Override public FileIO io() { return tableFileIO; @@ -1867,6 +2021,7 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) { } } + // Drop the table: return getMetaStoreManager() .dropEntityIfExists( getCurrentPolarisContext(), diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java index 3d12f75a7..6c4c8cf82 100644 --- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java +++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java @@ -615,7 +615,14 @@ public LoadTableResponse createTableDirectWithWriteDelegation( .create(); if (table instanceof BaseTable baseTable) { - TableMetadata tableMetadata = baseTable.operations().current(); + + final TableMetadata tableMetadata; + if (baseCatalog instanceof BasePolarisCatalog basePolarisCatalog) { + tableMetadata = basePolarisCatalog.loadTableMetadata(tableIdentifier); + } else { + tableMetadata = baseTable.operations().current(); + } + LoadTableResponse.Builder responseBuilder = LoadTableResponse.builder().withTableMetadata(tableMetadata); if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { @@ -796,7 +803,16 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snaps PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_TABLE; authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.TABLE, tableIdentifier); - return doCatalogOperation(() -> CatalogHandlers.loadTable(baseCatalog, tableIdentifier)); + return doCatalogOperation( + () -> { + if (baseCatalog instanceof BasePolarisCatalog basePolarisCatalog) { + return LoadTableResponse.builder() + .withTableMetadata(basePolarisCatalog.loadTableMetadata(tableIdentifier)) + .build(); + } + + return CatalogHandlers.loadTable(baseCatalog, tableIdentifier); + }); } public LoadTableResponse loadTableWithAccessDelegation( @@ -847,29 +863,31 @@ public LoadTableResponse loadTableWithAccessDelegation( // when data-access is specified but access delegation grants are not found. return doCatalogOperation( () -> { - Table table = baseCatalog.loadTable(tableIdentifier); + TableMetadata tableMetadata = null; + if (baseCatalog instanceof BasePolarisCatalog basePolarisCatalog) { + tableMetadata = basePolarisCatalog.loadTableMetadata(tableIdentifier); + } - if (table instanceof BaseTable baseTable) { - TableMetadata tableMetadata = baseTable.operations().current(); + // The metadata failed to load + if (tableMetadata == null) { + throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString()); + } + + if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { LoadTableResponse.Builder responseBuilder = LoadTableResponse.builder().withTableMetadata(tableMetadata); - if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { - LOGGER - .atDebug() - .addKeyValue("tableIdentifier", tableIdentifier) - .addKeyValue("tableLocation", tableMetadata.location()) - .log("Fetching client credentials for table"); - responseBuilder.addAllConfig( - credentialDelegation.getCredentialConfig( - tableIdentifier, tableMetadata, actionsRequested)); - } + LOGGER + .atDebug() + .addKeyValue("tableIdentifier", tableIdentifier) + .addKeyValue("tableLocation", tableMetadata.location()) + .log("Fetching client credentials for table"); + responseBuilder.addAllConfig( + credentialDelegation.getCredentialConfig( + tableIdentifier, tableMetadata, actionsRequested)); return responseBuilder.build(); - } else if (table instanceof BaseMetadataTable) { - // metadata tables are loaded on the client side, return NoSuchTableException for now - throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString()); + } else { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } - - throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); }); } @@ -1021,20 +1039,27 @@ public void commitTransaction(CommitTransactionRequest commitTransactionRequest) commitTransactionRequest.tableChanges().stream() .forEach( change -> { - Table table = baseCatalog.loadTable(change.identifier()); + final Table table = baseCatalog.loadTable(change.identifier()); + if (!(table instanceof BaseTable)) { throw new IllegalStateException( "Cannot wrap catalog that does not produce BaseTable"); } + + TableOperations tableOps = ((BaseTable) table).operations(); + final TableMetadata currentMetadata; + if (baseCatalog instanceof BasePolarisCatalog basePolarisCatalog) { + currentMetadata = basePolarisCatalog.loadTableMetadata(change.identifier()); + } else { + currentMetadata = tableOps.current(); + } + if (isCreate(change)) { throw new BadRequestException( "Unsupported operation: commitTranaction with updateForStagedCreate: %s", change); } - TableOperations tableOps = ((BaseTable) table).operations(); - TableMetadata currentMetadata = tableOps.current(); - // Validate requirements; any CommitFailedExceptions will fail the overall request change.requirements().forEach(requirement -> requirement.validate(currentMetadata)); diff --git a/polaris-service/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java b/polaris-service/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java new file mode 100644 index 000000000..788f92f4e --- /dev/null +++ b/polaris-service/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.persistence; + +import com.fasterxml.jackson.core.JsonGenerator; +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.util.JsonUtil; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.PolarisConfiguration; +import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; +import org.apache.polaris.core.entity.TableLikeEntity; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetadataCacheManager { + private static final Logger LOGGER = LoggerFactory.getLogger(MetadataCacheManager.class); + + /** + * Load the cached {@link Table} or fall back to `fallback` if one doesn't exist. If the metadata + * is not currently cached, it may be added to the cache. + */ + public static TableMetadata loadTableMetadata( + TableIdentifier tableIdentifier, + int maxBytesToCache, + PolarisCallContext callContext, + PolarisMetaStoreManager metastoreManager, + PolarisResolutionManifestCatalogView resolvedEntityView, + Supplier fallback) { + LOGGER.debug(String.format("Loading cached metadata for %s", tableIdentifier)); + PolarisResolvedPathWrapper resolvedEntities = + resolvedEntityView.getResolvedPath(tableIdentifier, PolarisEntitySubType.TABLE); + TableLikeEntity tableLikeEntity = TableLikeEntity.of(resolvedEntities.getRawLeafEntity()); + String cacheContent = tableLikeEntity.getMetadataCacheContent(); + if (cacheContent != null) { + LOGGER.debug(String.format("Using cached metadata for %s", tableIdentifier)); + return TableMetadataParser.fromJson(tableLikeEntity.getMetadataCacheContent()); + } else { + TableMetadata metadata = fallback.get(); + PolarisMetaStoreManager.EntityResult cacheResult = + cacheTableMetadata( + tableLikeEntity, + metadata, + maxBytesToCache, + callContext, + metastoreManager, + resolvedEntityView); + if (!cacheResult.isSuccess()) { + LOGGER.debug(String.format("Failed to cache metadata for %s", tableIdentifier)); + } + return metadata; + } + } + + /** Convert a {@link TableMetadata} to JSON, with the size bounded */ + public static Optional toBoundedJson(TableMetadata metadata, int maxBytes) { + try (StringWriter unboundedWriter = new StringWriter()) { + BoundedStringWriter boundedWriter = new BoundedStringWriter(unboundedWriter, maxBytes); + JsonGenerator generator = JsonUtil.factory().createGenerator(boundedWriter); + TableMetadataParser.toJson(metadata, generator); + generator.flush(); + String result = boundedWriter.toString(); + if (boundedWriter.isLimitExceeded()) { + return Optional.empty(); + } else { + return Optional.ofNullable(result); + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to write json for: %s", metadata); + } + } + + /** + * Attempt to add table metadata to the cache + * + * @return The result of trying to cache the metadata + */ + private static PolarisMetaStoreManager.EntityResult cacheTableMetadata( + TableLikeEntity tableLikeEntity, + TableMetadata metadata, + int maxBytesToCache, + PolarisCallContext callContext, + PolarisMetaStoreManager metaStoreManager, + PolarisResolutionManifestCatalogView resolvedEntityView) { + Optional jsonOpt = toBoundedJson(metadata, maxBytesToCache); + // We should not reach this method in this case, but check just in case... + if (maxBytesToCache != PolarisConfiguration.METADATA_CACHE_MAX_BYTES_NO_CACHING) { + if (jsonOpt.isEmpty()) { + LOGGER.debug( + String.format( + "Will not cache metadata for %s; metadata above the limit of %d bytes", + tableLikeEntity.getTableIdentifier(), maxBytesToCache)); + return new PolarisMetaStoreManager.EntityResult( + PolarisMetaStoreManager.EntityResult.ReturnStatus.SUCCESS, null); + } else { + LOGGER.debug( + String.format("Caching metadata for %s", tableLikeEntity.getTableIdentifier())); + TableLikeEntity newTableLikeEntity = + new TableLikeEntity.Builder(tableLikeEntity) + .setMetadataContent(tableLikeEntity.getMetadataLocation(), jsonOpt.get()) + .build(); + PolarisResolvedPathWrapper resolvedPath = + resolvedEntityView.getResolvedPath( + tableLikeEntity.getTableIdentifier(), PolarisEntitySubType.TABLE); + try { + return metaStoreManager.updateEntityPropertiesIfNotChanged( + callContext, + PolarisEntity.toCoreList(resolvedPath.getRawParentPath()), + newTableLikeEntity); + } catch (RuntimeException e) { + // PersistenceException (& other extension-specific exceptions) may not be in scope, + // but we can make a best-effort attempt to swallow it and just forego caching + if (e.toString().contains("PersistenceException")) { + LOGGER.debug( + String.format( + "Encountered an error while caching %s: %s", + tableLikeEntity.getTableIdentifier(), e)); + return new PolarisMetaStoreManager.EntityResult( + PolarisMetaStoreManager.EntityResult.ReturnStatus.UNEXPECTED_ERROR_SIGNALED, + e.getMessage()); + } else { + throw e; + } + } + } + } else { + LOGGER.debug( + String.format( + "Will not cache metadata for %s; metadata caching is disabled", + tableLikeEntity.getTableIdentifier())); + return new PolarisMetaStoreManager.EntityResult( + PolarisMetaStoreManager.EntityResult.ReturnStatus.SUCCESS, null); + } + } + + private static class BoundedStringWriter extends Writer { + private final Writer delegate; + private final int maxBytes; + private long writtenBytes = 0; + private boolean limitExceeded = false; + + /** Create a new BoundedWriter with a given limit `maxBytes`. -1 means no limit. */ + public BoundedStringWriter(StringWriter delegate, int maxBytes) { + this.delegate = delegate; + if (maxBytes == -1) { + this.maxBytes = Integer.MAX_VALUE; + } else { + this.maxBytes = maxBytes; + } + } + + private boolean canWriteBytes(long bytesToWrite) throws IOException { + if (writtenBytes + bytesToWrite > maxBytes) { + limitExceeded = true; + } + return !limitExceeded; + } + + /** `true` when the writer was asked to write more than `maxBytes` bytes */ + public final boolean isLimitExceeded() { + return limitExceeded; + } + + @Override + public final void write(char[] cbuf, int off, int len) throws IOException { + if (canWriteBytes(len)) { + delegate.write(cbuf, off, len); + writtenBytes += len; + } + } + + @Override + public final void write(int c) throws IOException { + if (canWriteBytes(1)) { + delegate.write(c); + writtenBytes++; + } + } + + @Override + public final void write(String str, int off, int len) throws IOException { + if (canWriteBytes(len)) { + delegate.write(str, off, len); + writtenBytes += len; + } + } + + @Override + public final Writer append(CharSequence csq) throws IOException { + String str = (csq == null) ? "null" : csq.toString(); + write(str, 0, str.length()); + return this; + } + + @Override + public final Writer append(CharSequence csq, int start, int end) throws IOException { + String str = (csq == null) ? "null" : csq.subSequence(start, end).toString(); + write(str, 0, str.length()); + return this; + } + + @Override + public final Writer append(char c) throws IOException { + write(c); + return this; + } + + @Override + public final void flush() throws IOException { + delegate.flush(); + } + + @Override + public final void close() throws IOException { + delegate.close(); + } + + @Override + public final String toString() { + return delegate.toString(); + } + } +} diff --git a/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java b/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java index 7b0270c40..cbfd09c7b 100644 --- a/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java +++ b/polaris-service/src/test/java/org/apache/polaris/service/catalog/BasePolarisCatalogTest.java @@ -45,6 +45,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -80,6 +81,7 @@ import org.apache.polaris.core.persistence.PolarisEntityManager; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.PolarisMetaStoreSession; +import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; import org.apache.polaris.core.storage.PolarisCredentialProperty; import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; @@ -92,6 +94,7 @@ import org.apache.polaris.service.catalog.io.TestFileIOFactory; import org.apache.polaris.service.exception.IcebergExceptionMapper; import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; +import org.apache.polaris.service.persistence.MetadataCacheManager; import org.apache.polaris.service.task.TableCleanupTaskHandler; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.task.TaskFileIOSupplier; @@ -132,6 +135,7 @@ public class BasePolarisCatalogTest extends CatalogTests { private PolarisEntityManager entityManager; private AuthenticatedPolarisPrincipal authenticatedRoot; private PolarisEntity catalogEntity; + private PolarisResolutionManifestCatalogView passthroughView; @BeforeEach @SuppressWarnings("unchecked") @@ -202,10 +206,11 @@ public void before() { PolarisConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION.catalogConfig(), "true") .addProperty( PolarisConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true") + .addProperty(PolarisConfiguration.METADATA_CACHE_MAX_BYTES.catalogConfig(), "-1") .setStorageConfigurationInfo(storageConfigModel, storageLocation) .build()); - PolarisPassthroughResolutionView passthroughView = + passthroughView = new PolarisPassthroughResolutionView( callContext, entityManager, authenticatedRoot, CATALOG_NAME); TaskExecutor taskExecutor = Mockito.mock(); @@ -1523,4 +1528,63 @@ public void testFileIOWrapper() { .getFirst())); Assertions.assertThat(measured.getNumDeletedFiles()).as("A table was deleted").isGreaterThan(0); } + + private Schema buildSchema(int fields) { + Types.NestedField[] fieldsArray = new Types.NestedField[fields]; + for (int i = 0; i < fields; i++) { + fieldsArray[i] = Types.NestedField.optional(i, "field_" + i, Types.IntegerType.get()); + } + return new Schema(fieldsArray); + } + + @Test + public void testMetadataCachingWithManualFallback() { + Namespace namespace = Namespace.of("manual-namespace"); + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "t1"); + + Schema schema = buildSchema(10); + + catalog.createNamespace(namespace); + Table createdTable = catalog.createTable(tableIdentifier, schema); + TableMetadata originalMetadata = ((BaseTable) createdTable).operations().current(); + + TableMetadata cachedMetadata = + MetadataCacheManager.loadTableMetadata( + tableIdentifier, + Integer.MAX_VALUE, + polarisContext, + metaStoreManager, + passthroughView, + () -> { + throw new IllegalStateException("Fell back even though a cache entry should exist!"); + }); + + // The metadata object is loaded from the cache + Assertions.assertThat(cachedMetadata).isNotSameAs(originalMetadata); + + // The content should match what was cached + Assertions.assertThat(TableMetadataParser.toJson(cachedMetadata)) + .isEqualTo(TableMetadataParser.toJson(originalMetadata)); + + // Update the table + TableOperations tableOps = catalog.newTableOps(tableIdentifier); + TableMetadata updatedMetadata = tableOps.current().updateSchema(buildSchema(100), 100); + tableOps.commit(tableOps.current(), updatedMetadata); + + // Read from the cache; it should detect a chance due to the update and load the new fallback + TableMetadata reloadedMetadata = + MetadataCacheManager.loadTableMetadata( + tableIdentifier, + Integer.MAX_VALUE, + polarisContext, + metaStoreManager, + passthroughView, + () -> { + throw new IllegalStateException( + "Fell back even though a cache entry should be updated on write"); + }); + + Assertions.assertThat(reloadedMetadata).isNotSameAs(cachedMetadata); + Assertions.assertThat(reloadedMetadata.schema().columns().size()).isEqualTo(100); + } }