diff --git a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java index 3956e9192aaf..ee7666e9a7e9 100644 --- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java +++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; -import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -42,23 +41,30 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.view.BaseMetastoreViewCatalog; +import org.apache.iceberg.view.BaseViewOperations; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewUtil; /** * Catalog implementation that uses in-memory data-structures to store the namespaces and tables. * This class doesn't touch external resources and can be utilized to write unit tests without side * effects. It uses {@link InMemoryFileIO}. */ -public class InMemoryCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Closeable { +public class InMemoryCatalog extends BaseMetastoreViewCatalog + implements SupportsNamespaces, Closeable { private static final Joiner SLASH = Joiner.on("/"); private static final Joiner DOT = Joiner.on("."); private final ConcurrentMap> namespaces; private final ConcurrentMap tables; + private final ConcurrentMap views; private FileIO io; private String catalogName; private String warehouseLocation; @@ -66,6 +72,7 @@ public class InMemoryCatalog extends BaseMetastoreCatalog implements SupportsNam public InMemoryCatalog() { this.namespaces = Maps.newConcurrentMap(); this.tables = Maps.newConcurrentMap(); + this.views = Maps.newConcurrentMap(); } @Override @@ -278,15 +285,69 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac public void close() throws IOException { namespaces.clear(); tables.clear(); + views.clear(); + } + + @Override + public List listViews(Namespace namespace) { + if (!namespaceExists(namespace) && !namespace.isEmpty()) { + throw new NoSuchNamespaceException( + "Cannot list views for namespace. Namespace does not exist: %s", namespace); + } + + return views.keySet().stream() + .filter(v -> namespace.isEmpty() || v.namespace().equals(namespace)) + .sorted(Comparator.comparing(TableIdentifier::toString)) + .collect(Collectors.toList()); + } + + @Override + protected InMemoryViewOperations newViewOps(TableIdentifier identifier) { + return new InMemoryViewOperations(io, identifier); + } + + @Override + public boolean dropView(TableIdentifier identifier) { + return null != views.remove(identifier); + } + + @Override + public synchronized void renameView(TableIdentifier from, TableIdentifier to) { + if (from.equals(to)) { + return; + } + + if (!namespaceExists(to.namespace())) { + throw new NoSuchNamespaceException( + "Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace()); + } + + String fromViewLocation = views.get(from); + if (null == fromViewLocation) { + throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to); + } + + if (tables.containsKey(to)) { + throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to); + } + + if (views.containsKey(to)) { + throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to); + } + + views.put(to, fromViewLocation); + views.remove(from); } private class InMemoryTableOperations extends BaseMetastoreTableOperations { private final FileIO fileIO; private final TableIdentifier tableIdentifier; + private final String fullTableName; InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) { this.fileIO = fileIO; this.tableIdentifier = tableIdentifier; + this.fullTableName = fullTableName(catalogName, tableIdentifier); } @Override @@ -300,8 +361,8 @@ public void doRefresh() { } @Override - public void doCommit(TableMetadata base, TableMetadata metadata) { - String newLocation = writeNewMetadata(metadata, currentVersion() + 1); + public synchronized void doCommit(TableMetadata base, TableMetadata metadata) { + String newLocation = writeNewMetadataIfRequired(base == null, metadata); String oldLocation = base == null ? null : base.metadataFileLocation(); if (null == base && !namespaceExists(tableIdentifier.namespace())) { @@ -310,6 +371,10 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { tableIdentifier, tableIdentifier.namespace()); } + if (views.containsKey(tableIdentifier)) { + throw new AlreadyExistsException("View with same name already exists: %s", tableIdentifier); + } + tables.compute( tableIdentifier, (k, existingLocation) -> { @@ -334,7 +399,72 @@ public FileIO io() { @Override protected String tableName() { - return tableIdentifier.toString(); + return fullTableName; + } + } + + private class InMemoryViewOperations extends BaseViewOperations { + private final FileIO io; + private final TableIdentifier identifier; + private final String fullViewName; + + InMemoryViewOperations(FileIO io, TableIdentifier identifier) { + this.io = io; + this.identifier = identifier; + this.fullViewName = ViewUtil.fullViewName(catalogName, identifier); + } + + @Override + public void doRefresh() { + String latestLocation = views.get(identifier); + if (latestLocation == null) { + disableRefresh(); + } else { + refreshFromMetadataLocation(latestLocation); + } + } + + @Override + public synchronized void doCommit(ViewMetadata base, ViewMetadata metadata) { + String newLocation = writeNewMetadataIfRequired(metadata); + String oldLocation = base == null ? null : currentMetadataLocation(); + + if (null == base && !namespaceExists(identifier.namespace())) { + throw new NoSuchNamespaceException( + "Cannot create view %s. Namespace does not exist: %s", + identifier, identifier.namespace()); + } + + if (tables.containsKey(identifier)) { + throw new AlreadyExistsException("Table with same name already exists: %s", identifier); + } + + views.compute( + identifier, + (k, existingLocation) -> { + if (!Objects.equal(existingLocation, oldLocation)) { + if (null == base) { + throw new AlreadyExistsException("View already exists: %s", identifier); + } + + throw new CommitFailedException( + "Cannot commit to view %s metadata location from %s to %s " + + "because it has been concurrently modified to %s", + identifier, oldLocation, newLocation, existingLocation); + } + + return newLocation; + }); + } + + @Override + public FileIO io() { + return io; + } + + @Override + protected String viewName() { + return fullViewName; } } } diff --git a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java new file mode 100644 index 000000000000..4d1309a97424 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.view; + +import java.util.Map; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public abstract class BaseMetastoreViewCatalog extends BaseMetastoreCatalog implements ViewCatalog { + protected abstract ViewOperations newViewOps(TableIdentifier identifier); + + @Override + public void initialize(String name, Map properties) { + super.initialize(name, properties); + } + + @Override + public String name() { + return super.name(); + } + + @Override + public View loadView(TableIdentifier identifier) { + if (isValidIdentifier(identifier)) { + ViewOperations ops = newViewOps(identifier); + if (ops.current() == null) { + throw new NoSuchViewException("View does not exist: %s", identifier); + } else { + return new BaseView(newViewOps(identifier), ViewUtil.fullViewName(name(), identifier)); + } + } + + throw new NoSuchViewException("Invalid view identifier: %s", identifier); + } + + @Override + public ViewBuilder buildView(TableIdentifier identifier) { + return new BaseViewBuilder(identifier); + } + + protected class BaseViewBuilder implements ViewBuilder { + private final TableIdentifier identifier; + private final ImmutableViewVersion.Builder viewVersionBuilder = ImmutableViewVersion.builder(); + private final Map properties = Maps.newHashMap(); + private Schema schema; + + protected BaseViewBuilder(TableIdentifier identifier) { + Preconditions.checkArgument( + isValidIdentifier(identifier), "Invalid view identifier: %s", identifier); + this.identifier = identifier; + } + + @Override + public ViewBuilder withSchema(Schema newSchema) { + this.schema = newSchema; + viewVersionBuilder.schemaId(newSchema.schemaId()); + return this; + } + + @Override + public ViewBuilder withQuery(String dialect, String sql) { + viewVersionBuilder.addRepresentations( + ImmutableSQLViewRepresentation.builder().dialect(dialect).sql(sql).build()); + return this; + } + + @Override + public ViewBuilder withDefaultCatalog(String defaultCatalog) { + viewVersionBuilder.defaultCatalog(defaultCatalog); + return this; + } + + @Override + public ViewBuilder withDefaultNamespace(Namespace namespace) { + viewVersionBuilder.defaultNamespace(namespace); + return this; + } + + @Override + public ViewBuilder withProperties(Map newProperties) { + this.properties.putAll(newProperties); + return this; + } + + @Override + public ViewBuilder withProperty(String key, String value) { + this.properties.put(key, value); + return this; + } + + @Override + public View create() { + ViewOperations ops = newViewOps(identifier); + if (null != ops.current()) { + throw new AlreadyExistsException("View already exists: %s", identifier); + } + + ViewVersion viewVersion = + viewVersionBuilder + .versionId(1) + .timestampMillis(System.currentTimeMillis()) + .putSummary("operation", "create") + .build(); + + ViewMetadata viewMetadata = + ViewMetadata.builder() + .setProperties(properties) + .setLocation(defaultWarehouseLocation(identifier)) + .setCurrentVersion(viewVersion, schema) + .build(); + + try { + ops.commit(null, viewMetadata); + } catch (CommitFailedException ignored) { + throw new AlreadyExistsException("View was created concurrently: %s", identifier); + } + + return new BaseView(ops, ViewUtil.fullViewName(name(), identifier)); + } + + @Override + public View replace() { + ViewOperations ops = newViewOps(identifier); + if (null == ops.current()) { + throw new NoSuchViewException("View does not exist: %s", identifier); + } + + ViewMetadata metadata = ops.current(); + int maxVersionId = + metadata.versions().stream() + .map(ViewVersion::versionId) + .max(Integer::compareTo) + .orElseGet(metadata::currentVersionId); + + ViewVersion viewVersion = + viewVersionBuilder + .versionId(maxVersionId + 1) + .timestampMillis(System.currentTimeMillis()) + .putSummary("operation", "replace") + .build(); + + ViewMetadata replacement = + ViewMetadata.buildFrom(metadata) + .setProperties(properties) + .setCurrentVersion(viewVersion, schema) + .build(); + + try { + ops.commit(metadata, replacement); + } catch (CommitFailedException ignored) { + throw new AlreadyExistsException("View was updated concurrently: %s", identifier); + } + + return new BaseView(ops, ViewUtil.fullViewName(name(), identifier)); + } + + @Override + public View createOrReplace() { + if (null == newViewOps(identifier).current()) { + return create(); + } else { + return replace(); + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/BaseView.java b/core/src/main/java/org/apache/iceberg/view/BaseView.java new file mode 100644 index 000000000000..a21bc2381f90 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/BaseView.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.view; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; + +public class BaseView implements View, Serializable { + + private final ViewOperations ops; + private final String name; + + public BaseView(ViewOperations ops, String name) { + this.ops = ops; + this.name = name; + } + + @Override + public String name() { + return name; + } + + public ViewOperations operations() { + return ops; + } + + @Override + public Schema schema() { + return operations().current().schema(); + } + + @Override + public Map schemas() { + return operations().current().schemasById(); + } + + @Override + public ViewVersion currentVersion() { + return operations().current().currentVersion(); + } + + @Override + public Iterable versions() { + return operations().current().versions(); + } + + @Override + public ViewVersion version(int versionId) { + return operations().current().version(versionId); + } + + @Override + public List history() { + return operations().current().history(); + } + + @Override + public Map properties() { + return operations().current().properties(); + } + + @Override + public UpdateViewProperties updateProperties() { + return new PropertiesUpdate(ops); + } + + @Override + public ReplaceViewVersion replaceVersion() { + return new ViewVersionReplace(ops); + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java new file mode 100644 index 000000000000..f7270b9a35ed --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.view; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class BaseViewOperations implements ViewOperations { + private static final Logger LOG = LoggerFactory.getLogger(BaseViewOperations.class); + + private static final String METADATA_FOLDER_NAME = "metadata"; + + private ViewMetadata currentMetadata = null; + private String currentMetadataLocation = null; + private boolean shouldRefresh = true; + private int version = -1; + + protected BaseViewOperations() {} + + protected void requestRefresh() { + this.shouldRefresh = true; + } + + protected void disableRefresh() { + this.shouldRefresh = false; + } + + protected abstract void doRefresh(); + + protected abstract void doCommit(ViewMetadata base, ViewMetadata metadata); + + protected abstract String viewName(); + + protected abstract FileIO io(); + + protected String currentMetadataLocation() { + return currentMetadataLocation; + } + + protected int currentVersion() { + return version; + } + + @Override + public ViewMetadata current() { + if (shouldRefresh) { + return refresh(); + } + + return currentMetadata; + } + + @Override + public ViewMetadata refresh() { + boolean currentMetadataWasAvailable = currentMetadata != null; + try { + doRefresh(); + } catch (NoSuchViewException e) { + if (currentMetadataWasAvailable) { + LOG.warn("Could not find the view during refresh, setting current metadata to null", e); + shouldRefresh = true; + } + + currentMetadata = null; + currentMetadataLocation = null; + version = -1; + throw e; + } + + return current(); + } + + @Override + public void commit(ViewMetadata base, ViewMetadata metadata) { + // if the metadata is already out of date, reject it + if (base != current()) { + if (base != null) { + throw new CommitFailedException("Cannot commit: stale view metadata"); + } else { + // when current is non-null, the view exists. but when base is null, the commit is trying + // to create the view + throw new AlreadyExistsException("View already exists: %s", viewName()); + } + } + + // if the metadata is not changed, return early + if (base == metadata) { + LOG.info("Nothing to commit."); + return; + } + + long start = System.currentTimeMillis(); + doCommit(base, metadata); + requestRefresh(); + + LOG.info( + "Successfully committed to view {} in {} ms", + viewName(), + System.currentTimeMillis() - start); + } + + private String writeNewMetadata(ViewMetadata metadata, int newVersion) { + String newMetadataFilePath = newMetadataFilePath(metadata, newVersion); + OutputFile newMetadataLocation = io().newOutputFile(newMetadataFilePath); + + // write the new metadata + // use overwrite to avoid negative caching in S3. this is safe because the metadata location is + // always unique because it includes a UUID. + ViewMetadataParser.overwrite(metadata, newMetadataLocation); + + return newMetadataLocation.location(); + } + + protected String writeNewMetadataIfRequired(ViewMetadata metadata) { + return null != metadata.metadataFileLocation() + ? metadata.metadataFileLocation() + : writeNewMetadata(metadata, version + 1); + } + + private String newMetadataFilePath(ViewMetadata metadata, int newVersion) { + return metadataFileLocation( + metadata, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), ".metadata.json")); + } + + private String metadataFileLocation(ViewMetadata metadata, String filename) { + return String.format( + "%s/%s/%s", + LocationUtil.stripTrailingSlash(metadata.location()), METADATA_FOLDER_NAME, filename); + } + + protected void refreshFromMetadataLocation(String newLocation) { + refreshFromMetadataLocation(newLocation, null, 20); + } + + protected void refreshFromMetadataLocation( + String newLocation, Predicate shouldRetry, int numRetries) { + refreshFromMetadataLocation( + newLocation, + shouldRetry, + numRetries, + metadataLocation -> ViewMetadataParser.read(io().newInputFile(metadataLocation))); + } + + protected void refreshFromMetadataLocation( + String newLocation, + Predicate shouldRetry, + int numRetries, + Function metadataLoader) { + if (!Objects.equal(currentMetadataLocation, newLocation)) { + LOG.info("Refreshing view metadata from new version: {}", newLocation); + + AtomicReference newMetadata = new AtomicReference<>(); + Tasks.foreach(newLocation) + .retry(numRetries) + .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */) + .throwFailureWhenFinished() + .stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null + .shouldRetryTest(shouldRetry) + .run(metadataLocation -> newMetadata.set(metadataLoader.apply(metadataLocation))); + + this.currentMetadata = newMetadata.get(); + this.currentMetadataLocation = newLocation; + this.version = parseVersion(newLocation); + } + + this.shouldRefresh = false; + } + + /** + * Parse the version from view metadata file name. + * + * @param metadataLocation view metadata file location + * @return version of the view metadata file in success case and -1 if the version is not parsable + * (as a sign that the metadata is not part of this catalog) + */ + private static int parseVersion(String metadataLocation) { + int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 + int versionEnd = metadataLocation.indexOf('-', versionStart); + if (versionEnd < 0) { + // found filesystem view's metadata + return -1; + } + + try { + return Integer.valueOf(metadataLocation.substring(versionStart, versionEnd)); + } catch (NumberFormatException e) { + LOG.warn("Unable to parse version from metadata location: {}", metadataLocation, e); + return -1; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java new file mode 100644 index 000000000000..8b2142c069ab --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.view; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; + +class PropertiesUpdate implements UpdateViewProperties { + private final ViewOperations ops; + private final Map updates = Maps.newHashMap(); + private final Set removals = Sets.newHashSet(); + private ViewMetadata base; + + PropertiesUpdate(ViewOperations ops) { + this.ops = ops; + this.base = ops.current(); + } + + @Override + public Map apply() { + this.base = ops.refresh(); + + return internalApply(base).properties(); + } + + private ViewMetadata internalApply(ViewMetadata metadata) { + return ViewMetadata.buildFrom(metadata) + .setProperties(updates) + .removeProperties(removals) + .build(); + } + + @Override + public void commit() { + Tasks.foreach(ops) + .retry( + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run(taskOps -> taskOps.commit(base, internalApply(base))); + } + + @Override + public UpdateViewProperties set(String key, String value) { + Preconditions.checkArgument(null != key, "Invalid key: null"); + Preconditions.checkArgument(null != value, "Invalid value: null"); + Preconditions.checkArgument( + !removals.contains(key), "Cannot remove and update the same key: %s", key); + + updates.put(key, value); + return this; + } + + @Override + public UpdateViewProperties remove(String key) { + Preconditions.checkArgument(null != key, "Invalid key: null"); + Preconditions.checkArgument( + !updates.containsKey(key), "Cannot remove and update the same key: %s", key); + + removals.add(key); + return this; + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java index fe2f2c8b684a..cb905bce09d6 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.Nullable; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.ValidationException; @@ -78,6 +79,9 @@ default Integer currentSchemaId() { List changes(); + @Nullable + String metadataFileLocation(); + default ViewVersion version(int versionId) { return versionsById().get(versionId); } @@ -145,6 +149,7 @@ class Builder { private int currentVersionId; private String location; private String uuid; + private String metadataLocation; // internal change tracking private Integer lastAddedVersionId = null; @@ -176,6 +181,7 @@ private Builder(ViewMetadata base) { this.currentVersionId = base.currentVersionId(); this.location = base.location(); this.uuid = base.uuid(); + this.metadataLocation = null; } public Builder upgradeFormatVersion(int newFormatVersion) { @@ -205,6 +211,11 @@ public Builder setLocation(String newLocation) { return this; } + public Builder setMetadataLocation(String newMetadataLocation) { + this.metadataLocation = newMetadataLocation; + return this; + } + public Builder setCurrentVersionId(int newVersionId) { if (newVersionId == LAST_ADDED) { ValidationException.check( @@ -375,6 +386,13 @@ public ViewMetadata build() { Preconditions.checkArgument(null != location, "Invalid location: null"); Preconditions.checkArgument(versions.size() > 0, "Invalid view: no versions were added"); + // when associated with a metadata file, metadata must have no changes so that the metadata + // matches exactly what is in the metadata file, which does not store changes. metadata + // location with changes is inconsistent. + Preconditions.checkArgument( + metadataLocation == null || changes.isEmpty(), + "Cannot create view metadata with a metadata location and changes"); + int historySize = PropertyUtil.propertyAsInt( properties, @@ -412,7 +430,8 @@ public ViewMetadata build() { retainedVersions, retainedHistory, properties, - changes); + changes, + metadataLocation); } static List expireVersions( diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java b/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java index 0852db6a516b..7a29c87bad9c 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java @@ -90,12 +90,20 @@ static void toJson(ViewMetadata metadata, JsonGenerator gen) throws IOException gen.writeEndObject(); } + public static ViewMetadata fromJson(String metadataLocation, String json) { + return JsonUtil.parse(json, node -> ViewMetadataParser.fromJson(metadataLocation, node)); + } + public static ViewMetadata fromJson(String json) { Preconditions.checkArgument(json != null, "Cannot parse view metadata from null string"); return JsonUtil.parse(json, ViewMetadataParser::fromJson); } public static ViewMetadata fromJson(JsonNode json) { + return fromJson(null, json); + } + + public static ViewMetadata fromJson(String metadataLocation, JsonNode json) { Preconditions.checkArgument(json != null, "Cannot parse view metadata from null object"); Preconditions.checkArgument( json.isObject(), "Cannot parse view metadata from non-object: %s", json); @@ -142,7 +150,8 @@ public static ViewMetadata fromJson(JsonNode json) { versions, historyEntries, properties, - ImmutableList.of()); + ImmutableList.of(), + metadataLocation); } public static void overwrite(ViewMetadata metadata, OutputFile outputFile) { @@ -155,7 +164,7 @@ public static void write(ViewMetadata metadata, OutputFile outputFile) { public static ViewMetadata read(InputFile file) { try (InputStream is = file.newStream()) { - return fromJson(JsonUtil.mapper().readValue(is, JsonNode.class)); + return fromJson(file.location(), JsonUtil.mapper().readValue(is, JsonNode.class)); } catch (IOException e) { throw new UncheckedIOException(String.format("Failed to read json file: %s", file), e); } diff --git a/core/src/main/java/org/apache/iceberg/view/ViewOperations.java b/core/src/main/java/org/apache/iceberg/view/ViewOperations.java new file mode 100644 index 000000000000..f9b3a9436f7f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewOperations.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.view; + +/** SPI interface to abstract view metadata access and updates. */ +public interface ViewOperations { + + /** + * Return the currently loaded view metadata, without checking for updates. + * + * @return view metadata + */ + ViewMetadata current(); + + /** + * Return the current view metadata after checking for updates. + * + * @return view metadata + */ + ViewMetadata refresh(); + + /** + * Replace the base view metadata with a new version. + * + *

This method should implement and document atomicity guarantees. + * + *

Implementations must check that the base metadata is current to avoid overwriting updates. + * Once the atomic commit operation succeeds, implementations must not perform any operations that + * may fail because failure in this method cannot be distinguished from commit failure. + * + *

Implementations must throw a {@link + * org.apache.iceberg.exceptions.CommitStateUnknownException} in cases where it cannot be + * determined if the commit succeeded or failed. For example if a network partition causes the + * confirmation of the commit to be lost, the implementation should throw a + * CommitStateUnknownException. This is important because downstream users of this API need to + * know whether they can clean up the commit or not, if the state is unknown then it is not safe + * to remove any files. All other exceptions will be treated as if the commit has failed. + * + * @param base view metadata on which changes were based + * @param metadata new view metadata with updates + */ + void commit(ViewMetadata base, ViewMetadata metadata); +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewUtil.java b/core/src/main/java/org/apache/iceberg/view/ViewUtil.java new file mode 100644 index 000000000000..b79e2c3ce3db --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewUtil.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.view; + +import org.apache.iceberg.catalog.TableIdentifier; + +public class ViewUtil { + private ViewUtil() {} + + public static String fullViewName(String catalog, TableIdentifier ident) { + return catalog + "." + ident; + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java new file mode 100644 index 000000000000..d6bd655da1b2 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.view; + +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; +import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS; +import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; + +class ViewVersionReplace implements ReplaceViewVersion { + private final ViewOperations ops; + private final List viewRepresentationsToAdd = Lists.newArrayList(); + private ViewMetadata base; + private Namespace defaultNamespace; + private String defaultCatalog; + private Schema schema; + + ViewVersionReplace(ViewOperations ops) { + this.ops = ops; + this.base = ops.current(); + } + + @Override + public ViewVersion apply() { + Preconditions.checkState( + !viewRepresentationsToAdd.isEmpty(), "Cannot replace view without specifying a query"); + Preconditions.checkState(null != schema, "Cannot replace view without specifying schema"); + + this.base = ops.refresh(); + + ViewVersion viewVersion = base.currentVersion(); + int maxVersionId = + base.versions().stream() + .map(ViewVersion::versionId) + .max(Integer::compareTo) + .orElseGet(viewVersion::versionId); + + return ImmutableViewVersion.builder() + .versionId(maxVersionId + 1) + .timestampMillis(System.currentTimeMillis()) + .schemaId(schema.schemaId()) + .defaultNamespace(defaultNamespace) + .defaultCatalog(defaultCatalog) + .putSummary("operation", "replace") + .addAllRepresentations(viewRepresentationsToAdd) + .build(); + } + + @Override + public void commit() { + Tasks.foreach(ops) + .retry( + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run( + taskOps -> { + ViewVersion newVersion = apply(); + // nothing to do if the version didn't change + if (this.base.currentVersion().equals(newVersion)) { + return; + } + + ViewMetadata updated = + ViewMetadata.buildFrom(this.base).setCurrentVersion(newVersion, schema).build(); + + taskOps.commit(base, updated); + }); + } + + @Override + public ReplaceViewVersion withQuery(String dialect, String sql) { + viewRepresentationsToAdd.add( + ImmutableSQLViewRepresentation.builder().dialect(dialect).sql(sql).build()); + return this; + } + + @Override + public ReplaceViewVersion withSchema(Schema newSchema) { + this.schema = newSchema; + return this; + } + + @Override + public ReplaceViewVersion withDefaultCatalog(String catalog) { + this.defaultCatalog = catalog; + return this; + } + + @Override + public ReplaceViewVersion withDefaultNamespace(Namespace namespace) { + this.defaultNamespace = namespace; + return this; + } +} diff --git a/core/src/test/java/org/apache/iceberg/inmemory/TestInMemoryViewCatalog.java b/core/src/test/java/org/apache/iceberg/inmemory/TestInMemoryViewCatalog.java new file mode 100644 index 000000000000..76731f58a6be --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/inmemory/TestInMemoryViewCatalog.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.inmemory; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.view.ViewCatalogTests; +import org.junit.jupiter.api.BeforeEach; + +public class TestInMemoryViewCatalog extends ViewCatalogTests { + private InMemoryCatalog catalog; + + @BeforeEach + public void before() { + this.catalog = new InMemoryCatalog(); + this.catalog.initialize("in-memory-catalog", ImmutableMap.of()); + } + + @Override + protected InMemoryCatalog catalog() { + return catalog; + } + + @Override + protected Catalog tableCatalog() { + return catalog; + } + + @Override + protected boolean requiresNamespaceCreate() { + return true; + } +} diff --git a/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java b/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java index acb344ffab97..b525068cdff8 100644 --- a/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java +++ b/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java @@ -522,4 +522,42 @@ public void uuidAssignment() { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Cannot reassign uuid"); } + + @Test + public void viewMetadataWithMetadataLocation() { + Schema schema = new Schema(1, Types.NestedField.required(1, "x", Types.LongType.get())); + ViewVersion viewVersion = + ImmutableViewVersion.builder() + .schemaId(schema.schemaId()) + .versionId(1) + .timestampMillis(23L) + .putSummary("operation", "a") + .defaultNamespace(Namespace.of("ns")) + .build(); + + assertThatThrownBy( + () -> + ViewMetadata.builder() + .setLocation("custom-location") + .setMetadataLocation("metadata-location") + .addSchema(schema) + .addVersion(viewVersion) + .setCurrentVersionId(1) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot create view metadata with a metadata location and changes"); + + // setting metadata location without changes is ok + ViewMetadata viewMetadata = + ViewMetadata.buildFrom( + ViewMetadata.builder() + .setLocation("custom-location") + .addSchema(schema) + .addVersion(viewVersion) + .setCurrentVersionId(1) + .build()) + .setMetadataLocation("metadata-location") + .build(); + assertThat(viewMetadata.metadataFileLocation()).isEqualTo("metadata-location"); + } } diff --git a/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java index 5efbcf026f08..aeb9d46a9f3a 100644 --- a/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java +++ b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java @@ -162,4 +162,68 @@ private String readViewMetadataInputFile(String fileName) throws Exception { Path path = Paths.get(getClass().getClassLoader().getResource(fileName).toURI()); return String.join("", java.nio.file.Files.readAllLines(path)); } + + @Test + public void viewMetadataWithMetadataLocation() throws Exception { + ViewVersion version1 = + ImmutableViewVersion.builder() + .versionId(1) + .timestampMillis(4353L) + .summary(ImmutableMap.of("operation", "create")) + .schemaId(1) + .defaultCatalog("some-catalog") + .defaultNamespace(Namespace.empty()) + .addRepresentations( + ImmutableSQLViewRepresentation.builder() + .sql("select 'foo' foo") + .dialect("spark-sql") + .build()) + .build(); + + ViewVersion version2 = + ImmutableViewVersion.builder() + .versionId(2) + .schemaId(1) + .timestampMillis(5555L) + .summary(ImmutableMap.of("operation", "replace")) + .defaultCatalog("some-catalog") + .defaultNamespace(Namespace.empty()) + .addRepresentations( + ImmutableSQLViewRepresentation.builder() + .sql("select 1 id, 'abc' data") + .dialect("spark-sql") + .build()) + .build(); + + String json = readViewMetadataInputFile("org/apache/iceberg/view/ValidViewMetadata.json"); + String metadataLocation = "s3://bucket/test/location/metadata/v1.metadata.json"; + ViewMetadata expectedViewMetadata = + ViewMetadata.buildFrom( + ViewMetadata.builder() + .assignUUID("fa6506c3-7681-40c8-86dc-e36561f83385") + .addSchema(TEST_SCHEMA) + .addVersion(version1) + .addVersion(version2) + .setLocation("s3://bucket/test/location") + .setProperties(ImmutableMap.of("some-key", "some-value")) + .setCurrentVersionId(2) + .upgradeFormatVersion(1) + .build()) + .setMetadataLocation(metadataLocation) + .build(); + + ViewMetadata actual = ViewMetadataParser.fromJson(metadataLocation, json); + assertThat(actual) + .usingRecursiveComparison() + .ignoringFieldsOfTypes(Schema.class) + .isEqualTo(expectedViewMetadata); + + actual = + ViewMetadataParser.fromJson( + metadataLocation, ViewMetadataParser.toJson(expectedViewMetadata)); + assertThat(actual) + .usingRecursiveComparison() + .ignoringFieldsOfTypes(Schema.class) + .isEqualTo(expectedViewMetadata); + } } diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java new file mode 100644 index 000000000000..e4629dc020c7 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java @@ -0,0 +1,943 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.view; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Arrays; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public abstract class ViewCatalogTests { + protected static final Schema SCHEMA = + new Schema( + 1, + required(3, "id", Types.IntegerType.get(), "unique ID"), + required(4, "data", Types.StringType.get())); + + private static final Schema OTHER_SCHEMA = + new Schema(2, required(1, "some_id", Types.IntegerType.get())); + + protected abstract C catalog(); + + protected abstract Catalog tableCatalog(); + + protected boolean requiresNamespaceCreate() { + return false; + } + + @Test + public void basicCreateView() { + TableIdentifier identifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(identifier.namespace()); + } + + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + + View view = + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(view).isNotNull(); + assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue(); + + // validate view settings + assertThat(view.name()).isEqualTo(catalog().name() + "." + identifier); + assertThat(view.properties()).isEmpty(); + assertThat(view.history()) + .hasSize(1) + .first() + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(1); + assertThat(view.schemas()).hasSize(1).containsKey(SCHEMA.schemaId()); + assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct()); + assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion()); + + assertThat(view.currentVersion()) + .isEqualTo( + ImmutableViewVersion.builder() + .timestampMillis(view.currentVersion().timestampMillis()) + .versionId(1) + .schemaId(SCHEMA.schemaId()) + .putSummary("operation", "create") + .defaultNamespace(identifier.namespace()) + .addRepresentations( + ImmutableSQLViewRepresentation.builder() + .sql("select * from ns.tbl") + .dialect("spark") + .build()) + .build()); + + assertThat(catalog().dropView(identifier)).isTrue(); + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + } + + @Test + public void completeCreateView() { + TableIdentifier identifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(identifier.namespace()); + } + + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + + View view = + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery("spark", "select * from ns.tbl") + .withQuery("trino", "select * from ns.tbl using X") + .withProperty("prop1", "val1") + .withProperty("prop2", "val2") + .create(); + + assertThat(view).isNotNull(); + assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue(); + + // validate view settings + assertThat(view.name()).isEqualTo(catalog().name() + "." + identifier); + assertThat(view.properties()).isEqualTo(ImmutableMap.of("prop1", "val1", "prop2", "val2")); + assertThat(view.history()) + .hasSize(1) + .first() + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(1); + assertThat(view.schema().schemaId()).isEqualTo(SCHEMA.schemaId()); + assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct()); + assertThat(view.schemas()).hasSize(1).containsKey(SCHEMA.schemaId()); + assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion()); + + assertThat(view.currentVersion()) + .isEqualTo( + ImmutableViewVersion.builder() + .timestampMillis(view.currentVersion().timestampMillis()) + .versionId(1) + .schemaId(SCHEMA.schemaId()) + .putSummary("operation", "create") + .defaultNamespace(identifier.namespace()) + .addRepresentations( + ImmutableSQLViewRepresentation.builder() + .sql("select * from ns.tbl") + .dialect("spark") + .build()) + .addRepresentations( + ImmutableSQLViewRepresentation.builder() + .sql("select * from ns.tbl using X") + .dialect("trino") + .build()) + .build()); + + assertThat(catalog().dropView(identifier)).isTrue(); + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + } + + @Test + public void createViewThatAlreadyExists() { + TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(viewIdentifier.namespace()); + } + + assertThat(catalog().viewExists(viewIdentifier)).isFalse(); + + View view = + catalog() + .buildView(viewIdentifier) + .withSchema(SCHEMA) + .withDefaultNamespace(viewIdentifier.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(view).isNotNull(); + assertThat(catalog().viewExists(viewIdentifier)).isTrue(); + + assertThatThrownBy( + () -> + catalog() + .buildView(viewIdentifier) + .withSchema(OTHER_SCHEMA) + .withQuery("spark", "select * from ns.tbl") + .withDefaultNamespace(viewIdentifier.namespace()) + .create()) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("View already exists: ns.view"); + } + + @Test + public void createViewThatAlreadyExistsAsTable() { + Assumptions.assumeThat(tableCatalog()) + .as("Only valid for catalogs that support tables") + .isNotNull(); + + TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table"); + TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(viewIdentifier.namespace()); + } + + assertThat(catalog().viewExists(viewIdentifier)).isFalse(); + + View view = + catalog() + .buildView(viewIdentifier) + .withSchema(SCHEMA) + .withDefaultNamespace(viewIdentifier.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(view).isNotNull(); + assertThat(catalog().viewExists(tableIdentifier)).isFalse(); + assertThat(catalog().viewExists(viewIdentifier)).isTrue(); + + assertThatThrownBy( + () -> + catalog() + .buildView(viewIdentifier) + .withSchema(OTHER_SCHEMA) + .withQuery("spark", "select * from ns.tbl") + .withDefaultNamespace(viewIdentifier.namespace()) + .create()) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("View already exists: ns.view"); + + tableCatalog().buildTable(tableIdentifier, SCHEMA).create(); + assertThat(tableCatalog().tableExists(tableIdentifier)).isTrue(); + assertThat(catalog().viewExists(tableIdentifier)).isFalse(); + + assertThatThrownBy( + () -> + catalog() + .buildView(tableIdentifier) + .withSchema(OTHER_SCHEMA) + .withDefaultNamespace(tableIdentifier.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create()) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("Table with same name already exists: ns.table"); + } + + @Test + public void createTableThatAlreadyExistsAsView() { + Assumptions.assumeThat(tableCatalog()) + .as("Only valid for catalogs that support tables") + .isNotNull(); + + TableIdentifier viewOne = TableIdentifier.of("ns", "viewOne"); + TableIdentifier viewTwo = TableIdentifier.of("ns", "viewTwo"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(viewOne.namespace()); + } + + assertThat(catalog().viewExists(viewOne)).isFalse(); + assertThat(tableCatalog().tableExists(viewTwo)).isFalse(); + + for (TableIdentifier identifier : Arrays.asList(viewTwo, viewOne)) { + assertThat( + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create()) + .isNotNull(); + + assertThat(catalog().viewExists(identifier)).isTrue(); + assertThat(tableCatalog().tableExists(identifier)).isFalse(); + } + + assertThatThrownBy(() -> tableCatalog().buildTable(viewTwo, SCHEMA).create()) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("View with same name already exists: ns.viewTwo"); + } + + @Test + public void renameView() { + TableIdentifier from = TableIdentifier.of("ns", "view"); + TableIdentifier to = TableIdentifier.of("ns", "renamedView"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(from.namespace()); + } + + catalog() + .buildView(from) + .withSchema(SCHEMA) + .withDefaultNamespace(from.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(catalog().viewExists(from)).as("View should exist").isTrue(); + assertThat(catalog().listViews(from.namespace())).containsExactly(from); + + catalog().renameView(from, to); + + assertThat(catalog().listViews(to.namespace())).containsExactly(to); + assertThat(catalog().viewExists(from)).as("View should not exist").isFalse(); + assertThat(catalog().viewExists(to)).as("View should exist").isTrue(); + + View view = catalog().loadView(to); + assertThat(view).isNotNull(); + + // validate view settings + assertThat(view.name()).isEqualTo(catalog().name() + "." + to); + assertThat(view.properties()).isEmpty(); + assertThat(view.history()) + .hasSize(1) + .first() + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(1); + assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct()); + assertThat(view.schemas()).hasSize(1).containsKey(SCHEMA.schemaId()); + assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion()); + + assertThat(view.currentVersion()) + .isEqualTo( + ImmutableViewVersion.builder() + .timestampMillis(view.currentVersion().timestampMillis()) + .versionId(1) + .schemaId(SCHEMA.schemaId()) + .putSummary("operation", "create") + .defaultNamespace(to.namespace()) + .addRepresentations( + ImmutableSQLViewRepresentation.builder() + .sql("select * from ns.tbl") + .dialect("spark") + .build()) + .build()); + + assertThat(catalog().dropView(to)).isTrue(); + assertThat(catalog().viewExists(to)).as("View should not exist").isFalse(); + } + + @Test + public void renameViewUsingDifferentNamespace() { + TableIdentifier from = TableIdentifier.of("ns", "view"); + TableIdentifier to = TableIdentifier.of("other_ns", "renamedView"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(from.namespace()); + catalog().createNamespace(to.namespace()); + } + + catalog() + .buildView(from) + .withSchema(SCHEMA) + .withDefaultNamespace(from.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(catalog().listViews(from.namespace())).containsExactly(from); + assertThat(catalog().viewExists(from)).as("View should exist").isTrue(); + + catalog().renameView(from, to); + + assertThat(catalog().listViews(to.namespace())).containsExactly(to); + assertThat(catalog().viewExists(from)).as("View should not exist").isFalse(); + assertThat(catalog().viewExists(to)).as("View should exist").isTrue(); + + View view = catalog().loadView(to); + assertThat(view).isNotNull(); + + // validate view settings + assertThat(view.name()).isEqualTo(catalog().name() + "." + to); + assertThat(view.properties()).isEmpty(); + assertThat(view.history()) + .hasSize(1) + .first() + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(1); + assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct()); + assertThat(view.schemas()).hasSize(1).containsKey(SCHEMA.schemaId()); + assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion()); + + assertThat(view.currentVersion()) + .isEqualTo( + ImmutableViewVersion.builder() + .timestampMillis(view.currentVersion().timestampMillis()) + .versionId(1) + .schemaId(SCHEMA.schemaId()) + .putSummary("operation", "create") + .defaultNamespace(from.namespace()) + .addRepresentations( + ImmutableSQLViewRepresentation.builder() + .sql("select * from ns.tbl") + .dialect("spark") + .build()) + .build()); + + assertThat(catalog().dropView(from)).isFalse(); + assertThat(catalog().dropView(to)).isTrue(); + assertThat(catalog().viewExists(to)).as("View should not exist").isFalse(); + } + + @Test + public void renameViewNamespaceMissing() { + TableIdentifier from = TableIdentifier.of("ns", "view"); + TableIdentifier to = TableIdentifier.of("non_existing", "renamedView"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(from.namespace()); + } + + catalog() + .buildView(from) + .withSchema(SCHEMA) + .withDefaultNamespace(from.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(catalog().viewExists(from)).as("View should exist").isTrue(); + + assertThatThrownBy(() -> catalog().renameView(from, to)) + .isInstanceOf(NoSuchNamespaceException.class) + .hasMessageContaining("Namespace does not exist: non_existing"); + } + + @Test + public void renameViewSourceMissing() { + TableIdentifier from = TableIdentifier.of("ns", "view"); + TableIdentifier to = TableIdentifier.of("ns", "renamedView"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(from.namespace()); + } + + assertThat(catalog().viewExists(from)).as("View should not exist").isFalse(); + + assertThatThrownBy(() -> catalog().renameView(from, to)) + .isInstanceOf(NoSuchViewException.class) + .hasMessageContaining("View does not exist"); + + assertThat(catalog().viewExists(from)).as("View should not exist").isFalse(); + assertThat(catalog().viewExists(to)).as("View should not exist").isFalse(); + } + + @Test + public void renameViewTargetAlreadyExists() { + TableIdentifier from = TableIdentifier.of("ns", "view"); + TableIdentifier to = TableIdentifier.of("ns", "renamedView"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(from.namespace()); + } + + for (TableIdentifier viewIdentifier : ImmutableList.of(from, to)) { + catalog() + .buildView(viewIdentifier) + .withSchema(SCHEMA) + .withDefaultNamespace(from.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + } + + assertThatThrownBy(() -> catalog().renameView(from, to)) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageContaining("Cannot rename ns.view to ns.renamedView. View already exists"); + + // rename view where a table with the same name already exists + TableIdentifier identifier = TableIdentifier.of("ns", "tbl"); + tableCatalog().buildTable(identifier, SCHEMA).create(); + + assertThatThrownBy(() -> catalog().renameView(from, identifier)) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageContaining("Cannot rename ns.view to ns.tbl. Table already exists"); + } + + @Test + public void listViews() { + Namespace ns1 = Namespace.of("ns1"); + Namespace ns2 = Namespace.of("ns2"); + + TableIdentifier tableIdentifier = TableIdentifier.of(ns1, "table"); + TableIdentifier view1 = TableIdentifier.of(ns1, "view1"); + TableIdentifier view2 = TableIdentifier.of(ns2, "view2"); + TableIdentifier view3 = TableIdentifier.of(ns2, "view3"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(ns1); + catalog().createNamespace(ns2); + } + + if (null != tableCatalog()) { + tableCatalog().buildTable(tableIdentifier, SCHEMA).create(); + assertThat(tableCatalog().listTables(ns1)).containsExactly(tableIdentifier); + assertThat(tableCatalog().listTables(ns2)).isEmpty(); + } + + assertThat(catalog().listViews(ns1)).isEmpty(); + assertThat(catalog().listViews(ns2)).isEmpty(); + + catalog() + .buildView(view1) + .withSchema(SCHEMA) + .withDefaultNamespace(view1.namespace()) + .withQuery("spark", "select * from ns1.tbl") + .create(); + + assertThat(catalog().listViews(ns1)).containsExactly(view1); + assertThat(catalog().listViews(ns2)).isEmpty(); + + catalog() + .buildView(view2) + .withSchema(SCHEMA) + .withDefaultNamespace(view2.namespace()) + .withQuery("spark", "select * from ns1.tbl") + .create(); + + assertThat(catalog().listViews(ns1)).containsExactly(view1); + assertThat(catalog().listViews(ns2)).containsExactly(view2); + + catalog() + .buildView(view3) + .withSchema(SCHEMA) + .withDefaultNamespace(view3.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(catalog().listViews(ns1)).containsExactly(view1); + assertThat(catalog().listViews(ns2)).containsExactlyInAnyOrder(view2, view3); + + if (null != tableCatalog()) { + assertThat(tableCatalog().listTables(ns1)).containsExactly(tableIdentifier); + assertThat(tableCatalog().listTables(ns2)).isEmpty(); + } + + assertThat(catalog().dropView(view2)).isTrue(); + assertThat(catalog().listViews(ns1)).containsExactly(view1); + assertThat(catalog().listViews(ns2)).containsExactly(view3); + + assertThat(catalog().dropView(view3)).isTrue(); + assertThat(catalog().listViews(ns1)).containsExactly(view1); + assertThat(catalog().listViews(ns2)).isEmpty(); + + assertThat(catalog().dropView(view1)).isTrue(); + assertThat(catalog().listViews(ns1)).isEmpty(); + assertThat(catalog().listViews(ns2)).isEmpty(); + } + + @ParameterizedTest(name = ".createOrReplace() = {arguments}") + @ValueSource(booleans = {false, true}) + public void createOrReplaceView(boolean useCreateOrReplace) { + TableIdentifier identifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(identifier.namespace()); + } + + ViewBuilder viewBuilder = + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery("spark", "select * from ns.tbl") + .withProperty("prop1", "val1") + .withProperty("prop2", "val2"); + View view = useCreateOrReplace ? viewBuilder.createOrReplace() : viewBuilder.create(); + + assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue(); + + // validate view settings + assertThat(view.name()).isEqualTo(catalog().name() + "." + identifier); + assertThat(view.properties()).isEqualTo(ImmutableMap.of("prop1", "val1", "prop2", "val2")); + assertThat(view.history()) + .hasSize(1) + .first() + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(1); + assertThat(view.schema().schemaId()).isEqualTo(SCHEMA.schemaId()); + assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct()); + assertThat(view.schemas()).hasSize(1).containsKey(SCHEMA.schemaId()); + assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion()); + + ViewVersion viewVersion = view.currentVersion(); + assertThat(viewVersion).isNotNull(); + assertThat(viewVersion.versionId()).isEqualTo(1); + assertThat(viewVersion.schemaId()).isEqualTo(SCHEMA.schemaId()); + assertThat(viewVersion.summary()).hasSize(1).containsEntry("operation", "create"); + assertThat(viewVersion.operation()).isEqualTo("create"); + assertThat(viewVersion.defaultNamespace()).isEqualTo(identifier.namespace()); + assertThat(viewVersion.representations()) + .hasSize(1) + .containsExactly( + ImmutableSQLViewRepresentation.builder() + .sql("select * from ns.tbl") + .dialect("spark") + .build()); + + viewBuilder = + catalog() + .buildView(identifier) + .withSchema(OTHER_SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery("trino", "select count(*) from ns.tbl") + .withProperty("replacedProp1", "val1") + .withProperty("replacedProp2", "val2"); + View replacedView = useCreateOrReplace ? viewBuilder.createOrReplace() : viewBuilder.replace(); + + // validate replaced view settings + assertThat(replacedView.name()).isEqualTo(catalog().name() + "." + identifier); + assertThat(replacedView.properties()) + .hasSize(4) + .containsEntry("prop1", "val1") + .containsEntry("prop2", "val2") + .containsEntry("replacedProp1", "val1") + .containsEntry("replacedProp2", "val2"); + assertThat(replacedView.history()) + .hasSize(2) + .first() + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(1); + assertThat(replacedView.history()) + .element(1) + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(2); + + assertThat(replacedView.schema().schemaId()).isEqualTo(OTHER_SCHEMA.schemaId()); + assertThat(replacedView.schema().asStruct()).isEqualTo(OTHER_SCHEMA.asStruct()); + assertThat(replacedView.schemas()) + .hasSize(2) + .containsKey(SCHEMA.schemaId()) + .containsKey(OTHER_SCHEMA.schemaId()); + + ViewVersion replacedViewVersion = replacedView.currentVersion(); + assertThat(replacedView.versions()) + .hasSize(2) + .containsExactly(viewVersion, replacedViewVersion); + assertThat(replacedViewVersion).isNotNull(); + assertThat(replacedViewVersion.versionId()).isEqualTo(2); + assertThat(replacedViewVersion.schemaId()).isEqualTo(OTHER_SCHEMA.schemaId()); + assertThat(replacedViewVersion.operation()).isEqualTo("replace"); + assertThat(replacedViewVersion.summary()).hasSize(1).containsEntry("operation", "replace"); + assertThat(replacedViewVersion.representations()) + .hasSize(1) + .containsExactly( + ImmutableSQLViewRepresentation.builder() + .sql("select count(*) from ns.tbl") + .dialect("trino") + .build()); + + assertThat(catalog().dropView(identifier)).isTrue(); + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + } + + @Test + public void updateViewProperties() { + TableIdentifier identifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(identifier.namespace()); + } + + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + + View view = + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery("spark", "select * from ns.tbl") + .create(); + + assertThat(view.properties()).isEmpty(); + ViewVersion viewVersion = view.currentVersion(); + assertThat(viewVersion.operation()).isEqualTo("create"); + assertThat(viewVersion.versionId()).isEqualTo(1); + assertThat(view.history()).hasSize(1); + assertThat(view.schemas()).hasSize(1).containsKey(SCHEMA.schemaId()); + assertThat(view.versions()).hasSize(1).containsExactly(viewVersion); + + assertThatThrownBy( + () -> catalog().loadView(identifier).updateProperties().set(null, "new-val1").commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid key: null"); + + assertThatThrownBy( + () -> catalog().loadView(identifier).updateProperties().set("key1", null).commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid value: null"); + + assertThatThrownBy( + () -> catalog().loadView(identifier).updateProperties().remove(null).commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid key: null"); + + assertThatThrownBy( + () -> + catalog() + .loadView(identifier) + .updateProperties() + .set("key1", "x") + .set("key3", "y") + .remove("key2") + .set("key2", "z") + .commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot remove and update the same key: key2"); + + view.updateProperties().set("key1", "val1").set("key2", "val2").remove("non-existing").commit(); + + View updatedView = catalog().loadView(identifier); + assertThat(updatedView.properties()) + .hasSize(2) + .containsEntry("key1", "val1") + .containsEntry("key2", "val2"); + assertThat(updatedView.history()).hasSize(1).isEqualTo(view.history()); + assertThat(updatedView.schemas()).hasSize(1).containsKey(SCHEMA.schemaId()); + assertThat(updatedView.versions()).hasSize(1).containsExactly(viewVersion); + + // updating properties doesn't change the view version + ViewVersion updatedViewVersion = updatedView.currentVersion(); + assertThat(updatedViewVersion).isNotNull(); + assertThat(updatedViewVersion.versionId()).isEqualTo(viewVersion.versionId()); + assertThat(updatedViewVersion.summary()).isEqualTo(viewVersion.summary()); + assertThat(updatedViewVersion.operation()).isEqualTo(viewVersion.operation()); + + assertThatThrownBy( + () -> + catalog() + .loadView(identifier) + .updateProperties() + .set("key1", "new-val1") + .set("key3", "val3") + .remove("key2") + .set("key2", "new-val2") + .commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot remove and update the same key: key2"); + + view.updateProperties().set("key1", "new-val1").set("key3", "val3").remove("key2").commit(); + + View updatedView2 = catalog().loadView(identifier); + assertThat(updatedView2.properties()) + .hasSize(2) + .containsEntry("key1", "new-val1") + .containsEntry("key3", "val3"); + assertThat(updatedView2.history()).hasSize(1).isEqualTo(view.history()); + assertThat(updatedView2.schemas()).hasSize(1).containsKey(SCHEMA.schemaId()); + assertThat(updatedView2.versions()).hasSize(1).containsExactly(viewVersion); + + ViewVersion updatedViewVersion2 = updatedView2.currentVersion(); + assertThat(updatedViewVersion2).isNotNull(); + assertThat(updatedViewVersion2.versionId()).isEqualTo(viewVersion.versionId()); + assertThat(updatedViewVersion2.summary()).isEqualTo(viewVersion.summary()); + assertThat(updatedViewVersion2.operation()).isEqualTo(viewVersion.operation()); + + assertThat(catalog().dropView(identifier)).isTrue(); + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + } + + @Test + public void replaceViewVersion() { + TableIdentifier identifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(identifier.namespace()); + } + + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + + SQLViewRepresentation spark = + ImmutableSQLViewRepresentation.builder() + .dialect("spark") + .sql("select * from ns.tbl") + .build(); + + SQLViewRepresentation trino = + ImmutableSQLViewRepresentation.builder() + .sql("select * from ns.tbl") + .dialect("trino") + .build(); + + View view = + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery(trino.dialect(), trino.sql()) + .withQuery(spark.dialect(), spark.sql()) + .create(); + + ViewVersion viewVersion = view.currentVersion(); + assertThat(view.properties()).isEmpty(); + assertThat(view.history()) + .hasSize(1) + .first() + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(viewVersion.versionId()); + assertThat(view.history()) + .hasSize(1) + .first() + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(view.currentVersion().versionId()); + assertThat(view.schemas()).hasSize(1).containsKey(SCHEMA.schemaId()); + assertThat(viewVersion.operation()).isEqualTo("create"); + assertThat(viewVersion.versionId()).isEqualTo(1); + assertThat(viewVersion.representations()).hasSize(2).containsExactly(trino, spark); + assertThat(view.versions()).hasSize(1).containsExactly(viewVersion); + + view.replaceVersion() + .withSchema(OTHER_SCHEMA) + .withQuery(trino.dialect(), trino.sql()) + .withDefaultCatalog("default") + .withDefaultNamespace(identifier.namespace()) + .commit(); + + View updatedView = catalog().loadView(identifier); + assertThat(updatedView.properties()).isEmpty(); + assertThat(updatedView.history()) + .hasSize(2) + .element(0) + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(viewVersion.versionId()); + assertThat(updatedView.history()) + .hasSize(2) + .element(1) + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(updatedView.currentVersion().versionId()); + assertThat(updatedView.schemas()) + .hasSize(2) + .containsKey(SCHEMA.schemaId()) + .containsKey(OTHER_SCHEMA.schemaId()); + assertThat(updatedView.versions()) + .hasSize(2) + .containsExactly(viewVersion, updatedView.currentVersion()); + + ViewVersion updatedViewVersion = updatedView.currentVersion(); + assertThat(updatedViewVersion).isNotNull(); + assertThat(updatedViewVersion.versionId()).isEqualTo(viewVersion.versionId() + 1); + assertThat(updatedViewVersion.summary()).hasSize(1).containsEntry("operation", "replace"); + assertThat(updatedViewVersion.operation()).isEqualTo("replace"); + assertThat(updatedViewVersion.representations()).hasSize(1).containsExactly(trino); + assertThat(updatedViewVersion.schemaId()).isEqualTo(OTHER_SCHEMA.schemaId()); + assertThat(updatedViewVersion.defaultCatalog()).isEqualTo("default"); + assertThat(updatedViewVersion.defaultNamespace()).isEqualTo(identifier.namespace()); + + SQLViewRepresentation updatedSpark = + ImmutableSQLViewRepresentation.builder() + .sql("select * from ns.updated_tbl") + .dialect("spark") + .build(); + + view.replaceVersion() + .withQuery(updatedSpark.dialect(), updatedSpark.sql()) + .withDefaultNamespace(identifier.namespace()) + .withSchema(OTHER_SCHEMA) + .commit(); + + View updatedView2 = catalog().loadView(identifier); + assertThat(updatedView2.properties()).isEmpty(); + assertThat(updatedView2.history()) + .hasSize(3) + .element(0) + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(viewVersion.versionId()); + assertThat(updatedView2.history()) + .element(1) + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(updatedViewVersion.versionId()); + assertThat(updatedView2.history()) + .element(2) + .extracting(ViewHistoryEntry::versionId) + .isEqualTo(updatedView2.currentVersion().versionId()); + assertThat(updatedView.schemas()) + .hasSize(2) + .containsKey(SCHEMA.schemaId()) + .containsKey(OTHER_SCHEMA.schemaId()); + assertThat(updatedView2.versions()) + .hasSize(3) + .containsExactly(viewVersion, updatedViewVersion, updatedView2.currentVersion()); + + ViewVersion updatedViewVersion2 = updatedView2.currentVersion(); + assertThat(updatedViewVersion2).isNotNull(); + assertThat(updatedViewVersion2.versionId()).isEqualTo(updatedViewVersion.versionId() + 1); + assertThat(updatedViewVersion2.summary()).hasSize(1).containsEntry("operation", "replace"); + assertThat(updatedViewVersion2.operation()).isEqualTo("replace"); + assertThat(updatedViewVersion2.representations()).hasSize(1).containsExactly(updatedSpark); + + assertThat(catalog().dropView(identifier)).isTrue(); + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + } + + @Test + public void replaceViewVersionErrorCases() { + TableIdentifier identifier = TableIdentifier.of("ns", "view"); + + if (requiresNamespaceCreate()) { + catalog().createNamespace(identifier.namespace()); + } + + assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse(); + + SQLViewRepresentation trino = + ImmutableSQLViewRepresentation.builder() + .sql("select * from ns.tbl") + .dialect("trino") + .build(); + + View view = + catalog() + .buildView(identifier) + .withSchema(SCHEMA) + .withDefaultNamespace(identifier.namespace()) + .withQuery(trino.dialect(), trino.sql()) + .create(); + + // empty commits are not allowed + assertThatThrownBy(() -> view.replaceVersion().commit()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Cannot replace view without specifying a query"); + + // schema is required + assertThatThrownBy( + () -> + view.replaceVersion() + .withQuery(trino.dialect(), trino.sql()) + .withDefaultNamespace(identifier.namespace()) + .commit()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Cannot replace view without specifying schema"); + } +}