Skip to content

Commit

Permalink
Core: Add remaining View APIs and support for InMemoryCatalog (#7880)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Sep 28, 2023
1 parent eaf7c4f commit 6100efc
Show file tree
Hide file tree
Showing 14 changed files with 2,649 additions and 72 deletions.
298 changes: 226 additions & 72 deletions core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.view;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public abstract class BaseMetastoreViewCatalog extends BaseMetastoreCatalog implements ViewCatalog {
protected abstract ViewOperations newViewOps(TableIdentifier identifier);

@Override
public void initialize(String name, Map<String, String> properties) {
super.initialize(name, properties);
}

@Override
public String name() {
return super.name();
}

@Override
public View loadView(TableIdentifier identifier) {
if (isValidIdentifier(identifier)) {
ViewOperations ops = newViewOps(identifier);
if (ops.current() == null) {
throw new NoSuchViewException("View does not exist: %s", identifier);
} else {
return new BaseView(newViewOps(identifier), ViewUtil.fullViewName(name(), identifier));
}
}

throw new NoSuchViewException("Invalid view identifier: %s", identifier);
}

@Override
public ViewBuilder buildView(TableIdentifier identifier) {
return new BaseViewBuilder(identifier);
}

protected class BaseViewBuilder implements ViewBuilder {
private final TableIdentifier identifier;
private final Map<String, String> properties = Maps.newHashMap();
private final List<ViewRepresentation> representations = Lists.newArrayList();
private Namespace defaultNamespace = null;
private String defaultCatalog = null;
private Schema schema = null;

protected BaseViewBuilder(TableIdentifier identifier) {
Preconditions.checkArgument(
isValidIdentifier(identifier), "Invalid view identifier: %s", identifier);
this.identifier = identifier;
}

@Override
public ViewBuilder withSchema(Schema newSchema) {
this.schema = newSchema;
return this;
}

@Override
public ViewBuilder withQuery(String dialect, String sql) {
representations.add(
ImmutableSQLViewRepresentation.builder().dialect(dialect).sql(sql).build());
return this;
}

@Override
public ViewBuilder withDefaultCatalog(String catalog) {
this.defaultCatalog = catalog;
return this;
}

@Override
public ViewBuilder withDefaultNamespace(Namespace namespace) {
this.defaultNamespace = namespace;
return this;
}

@Override
public ViewBuilder withProperties(Map<String, String> newProperties) {
this.properties.putAll(newProperties);
return this;
}

@Override
public ViewBuilder withProperty(String key, String value) {
this.properties.put(key, value);
return this;
}

@Override
public View create() {
return create(newViewOps(identifier));
}

@Override
public View replace() {
return replace(newViewOps(identifier));
}

@Override
public View createOrReplace() {
ViewOperations ops = newViewOps(identifier);
if (null == ops.current()) {
return create(ops);
} else {
return replace(ops);
}
}

private View create(ViewOperations ops) {
if (null != ops.current()) {
throw new AlreadyExistsException("View already exists: %s", identifier);
}

Preconditions.checkState(
!representations.isEmpty(), "Cannot create view without specifying a query");
Preconditions.checkState(null != schema, "Cannot create view without specifying schema");
Preconditions.checkState(
null != defaultNamespace, "Cannot create view without specifying a default namespace");

ViewVersion viewVersion =
ImmutableViewVersion.builder()
.versionId(1)
.schemaId(schema.schemaId())
.addAllRepresentations(representations)
.defaultNamespace(defaultNamespace)
.defaultCatalog(defaultCatalog)
.timestampMillis(System.currentTimeMillis())
.putSummary("operation", "create")
.build();

ViewMetadata viewMetadata =
ViewMetadata.builder()
.setProperties(properties)
.setLocation(defaultWarehouseLocation(identifier))
.setCurrentVersion(viewVersion, schema)
.build();

try {
ops.commit(null, viewMetadata);
} catch (CommitFailedException ignored) {
throw new AlreadyExistsException("View was created concurrently: %s", identifier);
}

return new BaseView(ops, ViewUtil.fullViewName(name(), identifier));
}

private View replace(ViewOperations ops) {
if (null == ops.current()) {
throw new NoSuchViewException("View does not exist: %s", identifier);
}

Preconditions.checkState(
!representations.isEmpty(), "Cannot replace view without specifying a query");
Preconditions.checkState(null != schema, "Cannot replace view without specifying schema");
Preconditions.checkState(
null != defaultNamespace, "Cannot replace view without specifying a default namespace");

ViewMetadata metadata = ops.current();
int maxVersionId =
metadata.versions().stream()
.map(ViewVersion::versionId)
.max(Integer::compareTo)
.orElseGet(metadata::currentVersionId);

ViewVersion viewVersion =
ImmutableViewVersion.builder()
.versionId(maxVersionId + 1)
.schemaId(schema.schemaId())
.addAllRepresentations(representations)
.defaultNamespace(defaultNamespace)
.defaultCatalog(defaultCatalog)
.timestampMillis(System.currentTimeMillis())
.putSummary("operation", "replace")
.build();

ViewMetadata replacement =
ViewMetadata.buildFrom(metadata)
.setProperties(properties)
.setCurrentVersion(viewVersion, schema)
.build();

try {
ops.commit(metadata, replacement);
} catch (CommitFailedException ignored) {
throw new AlreadyExistsException("View was updated concurrently: %s", identifier);
}

return new BaseView(ops, ViewUtil.fullViewName(name(), identifier));
}
}
}
89 changes: 89 additions & 0 deletions core/src/main/java/org/apache/iceberg/view/BaseView.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.view;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;

public class BaseView implements View, Serializable {

private final ViewOperations ops;
private final String name;

public BaseView(ViewOperations ops, String name) {
this.ops = ops;
this.name = name;
}

@Override
public String name() {
return name;
}

public ViewOperations operations() {
return ops;
}

@Override
public Schema schema() {
return operations().current().schema();
}

@Override
public Map<Integer, Schema> schemas() {
return operations().current().schemasById();
}

@Override
public ViewVersion currentVersion() {
return operations().current().currentVersion();
}

@Override
public Iterable<ViewVersion> versions() {
return operations().current().versions();
}

@Override
public ViewVersion version(int versionId) {
return operations().current().version(versionId);
}

@Override
public List<ViewHistoryEntry> history() {
return operations().current().history();
}

@Override
public Map<String, String> properties() {
return operations().current().properties();
}

@Override
public UpdateViewProperties updateProperties() {
return new PropertiesUpdate(ops);
}

@Override
public ReplaceViewVersion replaceVersion() {
return new ViewVersionReplace(ops);
}
}
Loading

0 comments on commit 6100efc

Please sign in to comment.