Skip to content

Commit

Permalink
API, Core: Allow setting a View's location (#8648)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Oct 4, 2023
1 parent c862b91 commit d2e1094
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 6 deletions.
4 changes: 2 additions & 2 deletions api/src/main/java/org/apache/iceberg/UpdateLocation.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
package org.apache.iceberg;

/** API for setting a table's base location. */
/** API for setting a table's or view's base location. */
public interface UpdateLocation extends PendingUpdate<String> {
/**
* Set the table's location.
* Set the table's or view's location.
*
* @param location a String location
* @return this for method chaining
Expand Down
19 changes: 19 additions & 0 deletions api/src/main/java/org/apache/iceberg/view/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdateLocation;

/** Interface for view definition. */
public interface View {
Expand Down Expand Up @@ -77,6 +78,15 @@ public interface View {
*/
Map<String, String> properties();

/**
* Return the view's base location.
*
* @return this view's location
*/
default String location() {
throw new UnsupportedOperationException("Retrieving a view's location is not supported");
}

/**
* Create a new {@link UpdateViewProperties} to update view properties.
*
Expand All @@ -92,4 +102,13 @@ public interface View {
default ReplaceViewVersion replaceVersion() {
throw new UnsupportedOperationException("Replacing a view's version is not supported");
}

/**
* Create a new {@link UpdateLocation} to set the view's location.
*
* @return a new {@link UpdateLocation}
*/
default UpdateLocation updateLocation() {
throw new UnsupportedOperationException("Updating a view's location is not supported");
}
}
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/view/ViewBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public interface ViewBuilder extends VersionBuilder<ViewBuilder> {
*/
ViewBuilder withProperty(String key, String value);

/**
* Sets a location for the view
*
* @param location the location to set for the view
* @return this for method chaining
*/
default ViewBuilder withLocation(String location) {
throw new UnsupportedOperationException("Setting a view's location is not supported");
}

/**
* Create the view.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected class BaseViewBuilder implements ViewBuilder {
private Namespace defaultNamespace = null;
private String defaultCatalog = null;
private Schema schema = null;
private String location = null;

protected BaseViewBuilder(TableIdentifier identifier) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -115,6 +116,12 @@ public ViewBuilder withProperty(String key, String value) {
return this;
}

@Override
public ViewBuilder withLocation(String newLocation) {
this.location = newLocation;
return this;
}

@Override
public View create() {
return create(newViewOps(identifier));
Expand Down Expand Up @@ -160,7 +167,7 @@ private View create(ViewOperations ops) {
ViewMetadata viewMetadata =
ViewMetadata.builder()
.setProperties(properties)
.setLocation(defaultWarehouseLocation(identifier))
.setLocation(null != location ? location : defaultWarehouseLocation(identifier))
.setCurrentVersion(viewVersion, schema)
.build();

Expand Down Expand Up @@ -202,11 +209,16 @@ private View replace(ViewOperations ops) {
.putSummary("operation", "replace")
.build();

ViewMetadata replacement =
ViewMetadata.Builder builder =
ViewMetadata.buildFrom(metadata)
.setProperties(properties)
.setCurrentVersion(viewVersion, schema)
.build();
.setCurrentVersion(viewVersion, schema);

if (null != location) {
builder.setLocation(location);
}

ViewMetadata replacement = builder.build();

try {
ops.commit(metadata, replacement);
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/view/BaseView.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdateLocation;

public class BaseView implements View, Serializable {

Expand Down Expand Up @@ -77,6 +78,11 @@ public Map<String, String> properties() {
return operations().current().properties();
}

@Override
public String location() {
return operations().current().location();
}

@Override
public UpdateViewProperties updateProperties() {
return new PropertiesUpdate(ops);
Expand All @@ -86,4 +92,9 @@ public UpdateViewProperties updateProperties() {
public ReplaceViewVersion replaceVersion() {
return new ViewVersionReplace(ops);
}

@Override
public UpdateLocation updateLocation() {
return new SetViewLocation(ops);
}
}
76 changes: 76 additions & 0 deletions core/src/main/java/org/apache/iceberg/view/SetViewLocation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.view;

import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;

import org.apache.iceberg.UpdateLocation;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;

class SetViewLocation implements UpdateLocation {
private final ViewOperations ops;
private String newLocation = null;

SetViewLocation(ViewOperations ops) {
this.ops = ops;
}

@Override
public String apply() {
Preconditions.checkState(null != newLocation, "Invalid view location: null");
return newLocation;
}

@Override
public void commit() {
ViewMetadata base = ops.refresh();
Tasks.foreach(ops)
.retry(
PropertyUtil.propertyAsInt(
base.properties(), COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
PropertyUtil.propertyAsInt(
base.properties(), COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
PropertyUtil.propertyAsInt(
base.properties(), COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
PropertyUtil.propertyAsInt(
base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(
taskOps ->
taskOps.commit(base, ViewMetadata.buildFrom(base).setLocation(apply()).build()));
}

@Override
public UpdateLocation setLocation(String location) {
this.newLocation = location;
return this;
}
}
112 changes: 112 additions & 0 deletions core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.iceberg.Schema;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateLocation;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand Down Expand Up @@ -131,13 +132,15 @@ public void completeCreateView() {
.withQuery("trino", "select * from ns.tbl using X")
.withProperty("prop1", "val1")
.withProperty("prop2", "val2")
.withLocation("file://tmp/ns/view")
.create();

assertThat(view).isNotNull();
assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue();

// validate view settings
assertThat(view.name()).isEqualTo(ViewUtil.fullViewName(catalog().name(), identifier));
assertThat(view.location()).isEqualTo("file://tmp/ns/view");
assertThat(view.properties()).containsEntry("prop1", "val1").containsEntry("prop2", "val2");
assertThat(view.history())
.hasSize(1)
Expand Down Expand Up @@ -1337,4 +1340,113 @@ public void replaceViewConflict() {
.isInstanceOf(NoSuchViewException.class)
.hasMessageStartingWith("View does not exist: ns.view");
}

@Test
public void createAndReplaceViewWithLocation() {
TableIdentifier identifier = TableIdentifier.of("ns", "view");

if (requiresNamespaceCreate()) {
catalog().createNamespace(identifier.namespace());
}

assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse();

View view =
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery("trino", "select * from ns.tbl")
.withLocation("file://tmp/ns/view")
.create();

assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue();
assertThat(view.location()).isEqualTo("file://tmp/ns/view");

view =
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery("trino", "select * from ns.tbl")
.withLocation("file://updated_tmp/ns/view")
.replace();

assertThat(view.location()).isEqualTo("file://updated_tmp/ns/view");

assertThat(catalog().dropView(identifier)).isTrue();
assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse();
}

@Test
public void updateViewLocation() {
TableIdentifier identifier = TableIdentifier.of("ns", "view");

if (requiresNamespaceCreate()) {
catalog().createNamespace(identifier.namespace());
}

assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse();

View view =
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery("trino", "select * from ns.tbl")
.withLocation("file://tmp/ns/view")
.create();

assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue();
assertThat(view.location()).isEqualTo("file://tmp/ns/view");

view.updateLocation().setLocation("file://updated_tmp/ns/view").commit();

View updatedView = catalog().loadView(identifier);

assertThat(updatedView.location()).isEqualTo("file://updated_tmp/ns/view");

// history and view versions should stay the same after updating view properties
assertThat(updatedView.history()).hasSize(1).isEqualTo(view.history());
assertThat(updatedView.versions()).hasSize(1).containsExactly(view.currentVersion());

assertThat(catalog().dropView(identifier)).isTrue();
assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse();
}

@Test
public void updateViewLocationConflict() {
TableIdentifier identifier = TableIdentifier.of("ns", "view");

if (requiresNamespaceCreate()) {
catalog().createNamespace(identifier.namespace());
}

assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse();

View view =
catalog()
.buildView(identifier)
.withSchema(SCHEMA)
.withDefaultNamespace(identifier.namespace())
.withQuery("trino", "select * from ns.tbl")
.create();

assertThat(catalog().viewExists(identifier)).as("View should exist").isTrue();

// new location must be non-null
assertThatThrownBy(() -> view.updateLocation().setLocation(null).commit())
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid view location: null");

UpdateLocation updateViewLocation = view.updateLocation();

catalog().dropView(identifier);
assertThat(catalog().viewExists(identifier)).as("View should not exist").isFalse();

// the view was already dropped concurrently
assertThatThrownBy(() -> updateViewLocation.setLocation("new-location").commit())
.isInstanceOf(CommitFailedException.class)
.hasMessageContaining("Cannot commit");
}
}

0 comments on commit d2e1094

Please sign in to comment.