Skip to content

Commit

Permalink
Core: Add remaining View APIs and support for InMemoryCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 22, 2023
1 parent d95bad3 commit 7972e84
Show file tree
Hide file tree
Showing 10 changed files with 1,948 additions and 5 deletions.
140 changes: 135 additions & 5 deletions core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
Expand All @@ -42,30 +41,38 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.BaseViewOperations;
import org.apache.iceberg.view.ViewMetadata;
import org.apache.iceberg.view.ViewUtil;

/**
* Catalog implementation that uses in-memory data-structures to store the namespaces and tables.
* This class doesn't touch external resources and can be utilized to write unit tests without side
* effects. It uses {@link InMemoryFileIO}.
*/
public class InMemoryCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Closeable {
public class InMemoryCatalog extends BaseMetastoreViewCatalog
implements SupportsNamespaces, Closeable {
private static final Joiner SLASH = Joiner.on("/");
private static final Joiner DOT = Joiner.on(".");

private final ConcurrentMap<Namespace, Map<String, String>> namespaces;
private final ConcurrentMap<TableIdentifier, String> tables;
private final ConcurrentMap<TableIdentifier, String> views;
private FileIO io;
private String catalogName;
private String warehouseLocation;

public InMemoryCatalog() {
this.namespaces = Maps.newConcurrentMap();
this.tables = Maps.newConcurrentMap();
this.views = Maps.newConcurrentMap();
}

@Override
Expand Down Expand Up @@ -278,15 +285,69 @@ public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespac
public void close() throws IOException {
namespaces.clear();
tables.clear();
views.clear();
}

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
if (!namespaceExists(namespace) && !namespace.isEmpty()) {
throw new NoSuchNamespaceException(
"Cannot list views for namespace. Namespace does not exist: %s", namespace);
}

return views.keySet().stream()
.filter(v -> namespace.isEmpty() || v.namespace().equals(namespace))
.sorted(Comparator.comparing(TableIdentifier::toString))
.collect(Collectors.toList());
}

@Override
protected InMemoryViewOperations newViewOps(TableIdentifier identifier) {
return new InMemoryViewOperations(io, identifier);
}

@Override
public boolean dropView(TableIdentifier identifier) {
return null != views.remove(identifier);
}

@Override
public synchronized void renameView(TableIdentifier from, TableIdentifier to) {
if (from.equals(to)) {
return;
}

if (!namespaceExists(to.namespace())) {
throw new NoSuchNamespaceException(
"Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace());
}

String fromViewLocation = views.get(from);
if (null == fromViewLocation) {
throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to);
}

if (tables.containsKey(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to);
}

if (views.containsKey(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to);
}

views.put(to, fromViewLocation);
views.remove(from);
}

private class InMemoryTableOperations extends BaseMetastoreTableOperations {
private final FileIO fileIO;
private final TableIdentifier tableIdentifier;
private final String fullTableName;

InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) {
this.fileIO = fileIO;
this.tableIdentifier = tableIdentifier;
this.fullTableName = fullTableName(catalogName, tableIdentifier);
}

@Override
Expand All @@ -300,8 +361,8 @@ public void doRefresh() {
}

@Override
public void doCommit(TableMetadata base, TableMetadata metadata) {
String newLocation = writeNewMetadata(metadata, currentVersion() + 1);
public synchronized void doCommit(TableMetadata base, TableMetadata metadata) {
String newLocation = writeNewMetadataIfRequired(base == null, metadata);
String oldLocation = base == null ? null : base.metadataFileLocation();

if (null == base && !namespaceExists(tableIdentifier.namespace())) {
Expand All @@ -310,6 +371,10 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
tableIdentifier, tableIdentifier.namespace());
}

if (views.containsKey(tableIdentifier)) {
throw new AlreadyExistsException("View with same name already exists: %s", tableIdentifier);
}

tables.compute(
tableIdentifier,
(k, existingLocation) -> {
Expand All @@ -334,7 +399,72 @@ public FileIO io() {

@Override
protected String tableName() {
return tableIdentifier.toString();
return fullTableName;
}
}

private class InMemoryViewOperations extends BaseViewOperations {
private final FileIO io;
private final TableIdentifier identifier;
private final String fullViewName;

InMemoryViewOperations(FileIO io, TableIdentifier identifier) {
this.io = io;
this.identifier = identifier;
this.fullViewName = ViewUtil.fullViewName(catalogName, identifier);
}

@Override
public void doRefresh() {
String latestLocation = views.get(identifier);
if (latestLocation == null) {
disableRefresh();
} else {
refreshFromMetadataLocation(latestLocation);
}
}

@Override
public synchronized void doCommit(ViewMetadata base, ViewMetadata metadata) {
String newLocation = writeNewMetadataIfRequired(metadata);
String oldLocation = base == null ? null : currentMetadataLocation();

if (null == base && !namespaceExists(identifier.namespace())) {
throw new NoSuchNamespaceException(
"Cannot create view %s. Namespace does not exist: %s",
identifier, identifier.namespace());
}

if (tables.containsKey(identifier)) {
throw new AlreadyExistsException("Table with same name already exists: %s", identifier);
}

views.compute(
identifier,
(k, existingLocation) -> {
if (!Objects.equal(existingLocation, oldLocation)) {
if (null == base) {
throw new AlreadyExistsException("View already exists: %s", identifier);
}

throw new CommitFailedException(
"Cannot commit to view %s metadata location from %s to %s "
+ "because it has been concurrently modified to %s",
identifier, oldLocation, newLocation, existingLocation);
}

return newLocation;
});
}

@Override
public FileIO io() {
return io;
}

@Override
protected String viewName() {
return fullViewName;
}
}
}
Loading

0 comments on commit 7972e84

Please sign in to comment.